adnanhemani commented on code in PR #2962:
URL: https://github.com/apache/polaris/pull/2962#discussion_r2496512085
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java:
##########
@@ -86,4 +89,8 @@ public interface AwsCloudWatchConfiguration {
@WithName("synchronous-mode")
@WithDefault("false")
boolean synchronousMode();
+
+ @WithName("event-types")
+ Optional<Set<Class<? extends PolarisEvent>>>
Review Comment:
Is this possible to do? I don't see the ability to do this in the Quarkus
public documentation?
##########
runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java:
##########
@@ -56,6 +61,10 @@ public void customize(ObjectMapper objectMapper) {
objectMapper.setVisibility(PropertyAccessor.FIELD,
JsonAutoDetect.Visibility.ANY);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
objectMapper.setPropertyNamingStrategy(new
PropertyNamingStrategies.KebabCaseStrategy());
+ objectMapper.addMixIn(PolarisEvent.class, PolarisEventBaseMixin.class);
+ objectMapper.addMixIn(TableIdentifier.class,
IcebergMixins.TableIdentifierMixin.class);
Review Comment:
I thought these are taken care of by the `RESTSerializers` below?
##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java:
##########
@@ -236,12 +248,18 @@ void shouldSendEventToCloudWatch() {
.satisfies(
logEvent -> {
String message = logEvent.message();
+ JsonNode root = objectMapper.readTree(message);
+ JsonNode event = root.path("event").isMissingNode() ? root :
root.path("event");
Review Comment:
nit: shouldn't the @JsonUnwrapped make sure that the event information is
not under an "event" object?
##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java:
##########
@@ -309,17 +327,49 @@ void shouldSendEventInSynchronousMode() {
@Test
void ensureObjectMapperCustomizerIsApplied() {
- AwsCloudWatchEventListener listener =
createListener(createCloudWatchAsyncClient());
- listener.start();
+
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper);
assertThat(listener.objectMapper.getPropertyNamingStrategy())
.isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class);
assertThat(listener.objectMapper.getFactory().streamReadConstraints().getMaxDocumentLength())
.isEqualTo(MAX_BODY_SIZE.longValue());
+
+ assertThat(objectMapper.findMixInClassFor(Namespace.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(IcebergMixins.NamespaceMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(TableIdentifier.class))
+ .as("TableIdentifier mixin should be registered")
+ .isEqualTo(IcebergMixins.TableIdentifierMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(PolarisEvent.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(PolarisEventBaseMixin.class);
+ }
+
+ @Test
+ void shouldListenToAllEventTypesWhenConfigNotProvided() {
+ // given: config.eventTypes() is empty → listen to all events
+ when(config.eventTypes()).thenReturn(java.util.Optional.empty());
Review Comment:
nit:
```suggestion
when(config.eventTypes()).thenReturn(Optional.empty());
```
##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java:
##########
@@ -309,17 +327,49 @@ void shouldSendEventInSynchronousMode() {
@Test
void ensureObjectMapperCustomizerIsApplied() {
- AwsCloudWatchEventListener listener =
createListener(createCloudWatchAsyncClient());
- listener.start();
+
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper);
assertThat(listener.objectMapper.getPropertyNamingStrategy())
.isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class);
assertThat(listener.objectMapper.getFactory().streamReadConstraints().getMaxDocumentLength())
.isEqualTo(MAX_BODY_SIZE.longValue());
+
+ assertThat(objectMapper.findMixInClassFor(Namespace.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(IcebergMixins.NamespaceMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(TableIdentifier.class))
+ .as("TableIdentifier mixin should be registered")
+ .isEqualTo(IcebergMixins.TableIdentifierMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(PolarisEvent.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(PolarisEventBaseMixin.class);
+ }
+
+ @Test
+ void shouldListenToAllEventTypesWhenConfigNotProvided() {
+ // given: config.eventTypes() is empty → listen to all events
+ when(config.eventTypes()).thenReturn(java.util.Optional.empty());
+
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper);
+
+ // This is any random PolarisEvent — if the listener listens to all types,
+ // shouldHandle(event) should return true
Review Comment:
I understand the thought behind this test, but technically this is not the
right way to test a "random" event. Maybe you create a new "RandomEvent" Java
record that extends "PolarisEvent" (making it clear that it _definitely_ cannot
be referred to outside the scope of this test class and show that
`shouldHandle` still returns true.
##########
runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java:
##########
@@ -309,17 +327,49 @@ void shouldSendEventInSynchronousMode() {
@Test
void ensureObjectMapperCustomizerIsApplied() {
- AwsCloudWatchEventListener listener =
createListener(createCloudWatchAsyncClient());
- listener.start();
+
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper);
assertThat(listener.objectMapper.getPropertyNamingStrategy())
.isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class);
assertThat(listener.objectMapper.getFactory().streamReadConstraints().getMaxDocumentLength())
.isEqualTo(MAX_BODY_SIZE.longValue());
+
+ assertThat(objectMapper.findMixInClassFor(Namespace.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(IcebergMixins.NamespaceMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(TableIdentifier.class))
+ .as("TableIdentifier mixin should be registered")
+ .isEqualTo(IcebergMixins.TableIdentifierMixin.class);
+
+ assertThat(objectMapper.findMixInClassFor(PolarisEvent.class))
+ .as("Namespace mixin should be registered")
+ .isEqualTo(PolarisEventBaseMixin.class);
+ }
+
+ @Test
+ void shouldListenToAllEventTypesWhenConfigNotProvided() {
+ // given: config.eventTypes() is empty → listen to all events
+ when(config.eventTypes()).thenReturn(java.util.Optional.empty());
+
+ AwsCloudWatchEventListener listener =
+ new AwsCloudWatchEventListener(config, clock, objectMapper);
+
+ // This is any random PolarisEvent — if the listener listens to all types,
+ // shouldHandle(event) should return true
+ PolarisEvent randomEvent =
+ new IcebergRestCatalogEvents.AfterRefreshTableEvent(
+ "test_catalog", TableIdentifier.of("db", "table"));
+
+ boolean shouldHandle = listener.shouldHandle(randomEvent);
+ assertThat(shouldHandle)
+ .as("Listener should handle all events when no eventTypes are
configured")
+ .isTrue();
}
private void verifyLogGroupAndStreamExist(CloudWatchLogsAsyncClient client) {
- // Verify log group exists
Review Comment:
Unnecessary change
##########
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java:
##########
@@ -86,17 +92,22 @@ class AwsCloudWatchEventListenerTest {
private ExecutorService executorService;
private AutoCloseable mockitoContext;
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ ;
@BeforeEach
void setUp() {
mockitoContext = MockitoAnnotations.openMocks(this);
executorService = Executors.newSingleThreadExecutor();
+ customizer.customize(objectMapper);
+
// Configure the mocks
when(config.awsCloudWatchLogGroup()).thenReturn(LOG_GROUP);
when(config.awsCloudWatchLogStream()).thenReturn(LOG_STREAM);
when(config.awsCloudWatchRegion()).thenReturn("us-east-1");
- when(config.synchronousMode()).thenReturn(false); // Default to async mode
+ when(config.synchronousMode()).thenReturn(false); // default async
+ when(config.eventTypes()).thenReturn(java.util.Optional.empty()); //
handle all events
Review Comment:
nit:
```suggestion
when(config.eventTypes()).thenReturn(Optional.empty()); // handle all
events
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]