adnanhemani commented on code in PR #1965:
URL: https://github.com/apache/polaris/pull/1965#discussion_r2209088685


##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));

Review Comment:
   Good call, done.



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    try {
+      client.createLogStream(
+          
CreateLogStreamRequest.builder().logGroupName(logGroup).logStreamName(logStream).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    sequenceToken = getSequenceToken();
+  }
+
+  private String getSequenceToken() {
+    DescribeLogStreamsResponse response =
+        client.describeLogStreams(
+            DescribeLogStreamsRequest.builder()
+                .logGroupName(logGroup)
+                .logStreamNamePrefix(logStream)
+                .build());
+
+    return response.logStreams().stream()
+        .filter(s -> logStream.equals(s.logStreamName()))
+        .map(LogStream::uploadSequenceToken)
+        .filter(Objects::nonNull)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @PreDestroy
+  void shutdown() {
+    running = false;
+    if (backgroundTask != null) {
+      try {
+        backgroundTask.get(10, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOGGER.error("Error waiting for background logging task to finish: 
{}", e.getMessage());
+      }
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  private record EventAndTimestamp(String event, long timestamp) {}
+
+  private long getCurrentTimestamp(CallContext callContext) {
+    return callContext.getPolarisCallContext().getClock().millis();
+  }
+
+  // Event overrides below
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event, 
CallContext callContext) {
+    try {
+      Map<String, Object> json = objectMapper.convertValue(event.catalog(), 
Map.class);
+      json.put("realm", callContext.getRealmContext().getRealmIdentifier());
+      json.put("event_type", event.getClass().getSimpleName());
+      queue.add(

Review Comment:
   This is a good point that I also thought of. However, the thought of having 
to make a an additional call to AWS synchronously - especially on read-only 
operations seems like it could impact call latency, especially if Polaris is 
not deployed on AWS servers and as a result is making a call to AWS servers 
through the open internet. So instead I went towards this queue model.
   
   An alternative would be to create an asynchronous task and throw the message 
into it and let Quarkus handle scheduling the task for eventual completion. But 
there will still be overhead for tracking all active async tasks, etc.
   
   WDYT?



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();

Review Comment:
   By this you mean keeping some maximum limit of how many events we will queue 
before rejecting to queue any more events? What are your thoughts on what a 
"reasonable bound" is? 10s/100s/1000s? (I'm assuming not more than that).
   
   In this current design, it would take a crazy burst of instantaneous events 
to get to anything more than 100s IMO.



##########
service/common/src/test/java/org/apache/polaris/service/events/AwsCloudWatchEventListenerTest.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent;
+
+class AwsCloudWatchEventListenerTest {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AwsCloudWatchEventListenerTest.class);
+
+  private static final DockerImageName LOCALSTACK_IMAGE =
+      DockerImageName.parse("localstack/localstack:3.4");

Review Comment:
   Unfortunately not, LocalStack is basically running a sidecar test server on 
Kubernetes that it uses to interact with the AWS SDK. Our alternative is to 
write off LocalStack and make this test a pure unit test.
   
   WDYT?



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();

Review Comment:
   Refactored.



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {

Review Comment:
   Good call, changed.



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }

Review Comment:
   Sure!



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();

Review Comment:
   Good call, done.



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);

Review Comment:
   Let me print all events in `debug` mode and put the event count in `error`.



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    try {
+      client.createLogStream(
+          
CreateLogStreamRequest.builder().logGroupName(logGroup).logStreamName(logStream).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    sequenceToken = getSequenceToken();
+  }
+
+  private String getSequenceToken() {
+    DescribeLogStreamsResponse response =
+        client.describeLogStreams(
+            DescribeLogStreamsRequest.builder()
+                .logGroupName(logGroup)
+                .logStreamNamePrefix(logStream)
+                .build());
+
+    return response.logStreams().stream()
+        .filter(s -> logStream.equals(s.logStreamName()))
+        .map(LogStream::uploadSequenceToken)
+        .filter(Objects::nonNull)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @PreDestroy
+  void shutdown() {
+    running = false;
+    if (backgroundTask != null) {
+      try {
+        backgroundTask.get(10, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOGGER.error("Error waiting for background logging task to finish: 
{}", e.getMessage());
+      }
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  private record EventAndTimestamp(String event, long timestamp) {}

Review Comment:
   Sure.



##########
service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java:
##########
@@ -55,4 +57,7 @@ public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent 
event) {}
 
   /** {@link AfterTaskAttemptedEvent} */
   public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+
+  /** {@link AfterCatalogCreatedEvent} */
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event, 
CallContext callContext) {}

Review Comment:
   This statement, while true, on its own is not the full picture. In creating 
this event, you can see the `CallContext` that was added to the function 
signature - we require that `CallContext` for the rest of the PR.
   
   So, if I remove this event, we no longer have unit tests that work - unless 
I modify a different event. In which case, I don't really see the opposition to 
adding this event anymore if we're okay with modifying other events as part of 
this PR (which will also see changes in other PRs).
   
   If this is a blocker, I can remove this new event - but then this PR will 
get even bigger with changes to all other events that currently exist. Let me 
know if there is a strong preference - but I'd prefer to keep this PR as-is and 
let followup PRs make whatever adjustments are required for all events.



##########
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.context.CallContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.InvalidSequenceTokenException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
+import 
software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
+
+@ApplicationScoped
+@Identifier("aws-cloudwatch")
+public class AwsCloudWatchEventListener extends PolarisEventListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private static final int MAX_BATCH_SIZE = 10_000;
+  private static final int MAX_WAIT_MS = 5000;
+
+  private final BlockingQueue<EventAndTimestamp> queue = new 
LinkedBlockingQueue<>();
+  private CloudWatchLogsClient client;
+  private volatile String sequenceToken;
+
+  private volatile boolean running = true;
+
+  ExecutorService executorService;
+
+  private Future<?> backgroundTask;
+
+  private final String logGroup;
+  private final String logStream;
+  private final Region region;
+
+  @Inject
+  public AwsCloudWatchEventListener(
+      EventListenerConfiguration config, ExecutorService executorService) {
+    this.executorService = executorService;
+
+    this.logStream = 
config.awsCloudwatchlogStream().orElse("polaris-cloudwatch-default-stream");
+    this.logGroup = 
config.awsCloudwatchlogGroup().orElse("polaris-cloudwatch-default-group");
+    this.region = Region.of(config.awsCloudwatchRegion().orElse("us-east-1"));
+  }
+
+  @PostConstruct
+  void start() {
+    this.client = createCloudWatchClient();
+    ensureLogGroupAndStream();
+    backgroundTask = executorService.submit(this::processQueue);
+  }
+
+  protected CloudWatchLogsClient createCloudWatchClient() {
+    return CloudWatchLogsClient.builder().region(region).build();
+  }
+
+  private void processQueue() {
+    while (running || !queue.isEmpty()) {
+      drainQueue();
+    }
+  }
+
+  @VisibleForTesting
+  public void drainQueue() {
+    List<EventAndTimestamp> drainedEvents = new ArrayList<>();
+    List<InputLogEvent> transformedEvents = new ArrayList<>();
+    try {
+      EventAndTimestamp first = queue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+      if (first != null) {
+        drainedEvents.add(first);
+        queue.drainTo(drainedEvents, MAX_BATCH_SIZE - 1);
+      } else {
+        return;
+      }
+
+      drainedEvents.forEach(event -> 
transformedEvents.add(createLogEvent(event)));
+
+      sendToCloudWatch(transformedEvents);
+    } catch (Exception e) {
+      LOGGER.error("Error writing logs to CloudWatch: {}", e.getMessage());
+      LOGGER.error("Events not logged: {}", transformedEvents);
+      queue.addAll(drainedEvents);
+    }
+  }
+
+  private InputLogEvent createLogEvent(EventAndTimestamp eventAndTimestamp) {
+    return InputLogEvent.builder()
+        .message(eventAndTimestamp.event)
+        .timestamp(eventAndTimestamp.timestamp)
+        .build();
+  }
+
+  private void sendToCloudWatch(List<InputLogEvent> events) {
+    events.sort(Comparator.comparingLong(InputLogEvent::timestamp));
+
+    PutLogEventsRequest.Builder requestBuilder =
+        PutLogEventsRequest.builder()
+            .logGroupName(logGroup)
+            .logStreamName(logStream)
+            .logEvents(events);
+
+    synchronized (this) {
+      if (sequenceToken != null) {
+        requestBuilder.sequenceToken(sequenceToken);
+      }
+
+      try {
+        PutLogEventsResponse response = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = response.nextSequenceToken();
+      } catch (InvalidSequenceTokenException e) {
+        sequenceToken = getSequenceToken();
+        requestBuilder.sequenceToken(sequenceToken);
+        PutLogEventsResponse retryResponse = 
client.putLogEvents(requestBuilder.build());
+        sequenceToken = retryResponse.nextSequenceToken();
+      }
+    }
+  }
+
+  private void ensureLogGroupAndStream() {
+    try {
+      
client.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    try {
+      client.createLogStream(
+          
CreateLogStreamRequest.builder().logGroupName(logGroup).logStreamName(logStream).build());
+    } catch (ResourceAlreadyExistsException ignored) {
+    }
+
+    sequenceToken = getSequenceToken();
+  }
+
+  private String getSequenceToken() {
+    DescribeLogStreamsResponse response =
+        client.describeLogStreams(
+            DescribeLogStreamsRequest.builder()
+                .logGroupName(logGroup)
+                .logStreamNamePrefix(logStream)
+                .build());
+
+    return response.logStreams().stream()
+        .filter(s -> logStream.equals(s.logStreamName()))
+        .map(LogStream::uploadSequenceToken)
+        .filter(Objects::nonNull)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @PreDestroy
+  void shutdown() {
+    running = false;
+    if (backgroundTask != null) {
+      try {
+        backgroundTask.get(10, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOGGER.error("Error waiting for background logging task to finish: 
{}", e.getMessage());
+      }
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  private record EventAndTimestamp(String event, long timestamp) {}
+
+  private long getCurrentTimestamp(CallContext callContext) {
+    return callContext.getPolarisCallContext().getClock().millis();
+  }
+
+  // Event overrides below
+  @Override
+  public void onAfterCatalogCreated(AfterCatalogCreatedEvent event, 
CallContext callContext) {
+    try {
+      Map<String, Object> json = objectMapper.convertValue(event.catalog(), 
Map.class);

Review Comment:
   I've tried to refactor it out. I agree that we should probably pull this 
away from the actual listener - but let me tackle that on follow-up PRs for 
Azure and GCP. I don't want to prematurely refactor without having a different 
implementation that uses that code, if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to