eric-maynard commented on code in PR #1965:
URL: https://github.com/apache/polaris/pull/1965#discussion_r2208701521


##########
runtime/defaults/src/main/resources/application.properties:
##########
@@ -126,6 +126,10 @@ polaris.secrets-manager.type=in-memory
 polaris.file-io.type=default
 
 polaris.event-listener.type=no-op
+# polaris.event-listener.type=aws-cloudwatch
+# polaris.event-listener.aws-cloudwatch.log-group=test-group
+# polaris.event-listener.aws-cloudwatch.log-stream=test-stream
+# polaris.event-listener.aws-cloudwatch.region=us-west-2

Review Comment:
   The intent behind these comments is good, but since it's a bit 
cloud-specific we might be better off leaving it out



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();

Review Comment:
   If we stick with this approach, we should put some reasonable bound on the 
`LinkedBlockingQueue`



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));

Review Comment:
   Let's make constants out of these



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }

Review Comment:
   Might as well add a debug log here



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {

Review Comment:
   this catch is too broad imo



##########
service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import java.util.Optional;
+
+public interface EventListenerConfiguration {
+  Optional<String> awsCloudwatchlogGroup();
+
+  Optional<String> awsCloudwatchlogStream();
+
+  Optional<String> awsCloudwatchRegion();

Review Comment:
   How does this work if I want to define my own EventListener -- do I 
essentially need to fork and modify this class?
   
   Won't this also grow quite large as more listeners get implemented?



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    try {
+      client.createLogStream(
+          
CreateLogStreamRequest.builder().logGroupName(logGroup).logStreamName(logStream).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    sequenceToken = getSequenceToken();
+  }
+
+  private String getSequenceToken() {
+    DescribeLogStreamsResponse response =
+        client.describeLogStreams(
+            DescribeLogStreamsRequest.builder()
+                .logGroupName(logGroup)
+                .logStreamNamePrefix(logStream)
+                .build());
+
+    return response.logStreams().stream()
+        .filter(s -> logStream.equals(s.logStreamName()))
+        .map(LogStream::uploadSequenceToken)
+        .filter(Objects::nonNull)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @PreDestroy
+  void shutdown() {
+    running = false;
+    if (backgroundTask != null) {
+      try {
+        backgroundTask.get(10, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOGGER.error("Error waiting for background logging task to finish: 
{}", e.getMessage());
+      }
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  private record EventAndTimestamp(String event, long timestamp) {}
+
+  private long getCurrentTimestamp(CallContext callContext) {
+    return callContext.getPolarisCallContext().getClock().millis();
+  }
+
+  // Event overrides below
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event, 
CallContext callContext) {
+    try {
+      Map<String, Object> json = objectMapper.convertValue(event.catalog(), 
Map.class);
+      json.put("realm", callContext.getRealmContext().getRealmIdentifier());
+      json.put("event_type", event.getClass().getSimpleName());
+      queue.add(

Review Comment:
   Since the intent of this listener is to keep things simple, perhaps we 
should just emit messages synchronously here rather than rely on a bg thread 
picking them up from a queue



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);

Review Comment:
   This could be quite verbose, can we just print one event or some kind of 
summary?



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    try {
+      client.createLogStream(
+          
CreateLogStreamRequest.builder().logGroupName(logGroup).logStreamName(logStream).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    sequenceToken = getSequenceToken();
+  }
+
+  private String getSequenceToken() {
+    DescribeLogStreamsResponse response =
+        client.describeLogStreams(
+            DescribeLogStreamsRequest.builder()
+                .logGroupName(logGroup)
+                .logStreamNamePrefix(logStream)
+                .build());
+
+    return response.logStreams().stream()
+        .filter(s -> logStream.equals(s.logStreamName()))
+        .map(LogStream::uploadSequenceToken)
+        .filter(Objects::nonNull)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @PreDestroy
+  void shutdown() {
+    running = false;
+    if (backgroundTask != null) {
+      try {
+        backgroundTask.get(10, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOGGER.error("Error waiting for background logging task to finish: 
{}", e.getMessage());
+      }
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  private record EventAndTimestamp(String event, long timestamp) {}

Review Comment:
   nit: `EventWithTimestamp`?



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();

Review Comment:
   If we stick with this approach, we should re-use these objects



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();

Review Comment:
   nit: some repeated code here could be refactored out



##########
service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java:
##########
@@ -55,4 +57,7 @@ public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent 
event) {}
 
   /** {@link AfterTaskAttemptedEvent} */
   public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+
+  /** {@link AfterCatalogCreatedEvent} */
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event, 
CallContext callContext) {}

Review Comment:
   This isn't strictly required for the sink implementation, can we consider 
separating it out? IIUC there is another PR that actually subsumes this change 
and will make it unnecessary?



##########
runtime/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java:
##########
@@ -20,13 +20,29 @@
 
 import io.quarkus.runtime.annotations.StaticInitSafe;
 import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithName;
+import java.util.Optional;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.apache.polaris.service.events.PolarisEventListener;
 
 @StaticInitSafe
 @ConfigMapping(prefix = "polaris.event-listener")
-public interface QuarkusPolarisEventListenerConfiguration {
+public interface QuarkusPolarisEventListenerConfiguration extends 
EventListenerConfiguration {
   /**
-   * The type of the event listener to use. Must be a registered {@link
-   * org.apache.polaris.service.events.PolarisEventListener} identifier.
+   * The type of the event listener to use. Must be a registered {@link 
PolarisEventListener}
+   * identifier.
    */
   String type();
+
+  @WithName("aws-cloudwatch.log-group")
+  @Override
+  Optional<String> awsCloudwatchlogGroup();

Review Comment:
   My same comment about extensibility applies; I would expect a new 
`...Configuration` type per `PolarisEventListener` implementation



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    try {
+      client.createLogStream(
+          
CreateLogStreamRequest.builder().logGroupName(logGroup).logStreamName(logStream).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    sequenceToken = getSequenceToken();
+  }
+
+  private String getSequenceToken() {
+    DescribeLogStreamsResponse response =
+        client.describeLogStreams(
+            DescribeLogStreamsRequest.builder()
+                .logGroupName(logGroup)
+                .logStreamNamePrefix(logStream)
+                .build());
+
+    return response.logStreams().stream()
+        .filter(s -> logStream.equals(s.logStreamName()))
+        .map(LogStream::uploadSequenceToken)
+        .filter(Objects::nonNull)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @PreDestroy
+  void shutdown() {
+    running = false;
+    if (backgroundTask != null) {
+      try {
+        backgroundTask.get(10, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOGGER.error("Error waiting for background logging task to finish: 
{}", e.getMessage());
+      }
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  private record EventAndTimestamp(String event, long timestamp) {}
+
+  private long getCurrentTimestamp(CallContext callContext) {
+    return callContext.getPolarisCallContext().getClock().millis();
+  }
+
+  // Event overrides below
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event, 
CallContext callContext) {
+    try {
+      Map<String, Object> json = objectMapper.convertValue(event.catalog(), 
Map.class);

Review Comment:
   IIUC this will result in a lot of duplicated code for each event type, which 
we should try to refactor away. Also, it may make sense to separate these from 
the actual "logic" of the listener.



##########
service/common/src/test/java/org/apache/polaris/service/events/AwsCloudWatchEventListenerTest.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent;
+
+class AwsCloudWatchEventListenerTest {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AwsCloudWatchEventListenerTest.class);
+
+  private static final DockerImageName LOCALSTACK_IMAGE =
+      DockerImageName.parse("localstack/localstack:3.4");

Review Comment:
   I don't know much about localstack but this looks a bit scary, is there a 
way we can use our existing regtests/docker setup for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to