This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 96284a94 Notification API reject out of order notifications (#232)
96284a94 is described below

commit 96284a94bb734ab980b845cf01e49bbd314a37bf
Author: Hung-An (Alvin) Chen <[email protected]>
AuthorDate: Fri Sep 6 09:24:01 2024 -0500

    Notification API reject out of order notifications (#232)
---
 .../polaris/core/entity/TableLikeEntity.java       | 16 +++++
 .../service/catalog/BasePolarisCatalog.java        | 15 ++++-
 .../service/catalog/BasePolarisCatalogTest.java    | 68 +++++++++++++++++++++-
 spec/rest-catalog-open-api.yaml                    | 32 +++++++++-
 4 files changed, 128 insertions(+), 3 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
index b5ce9bb4..968598b9 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
@@ -19,6 +19,7 @@
 package org.apache.polaris.core.entity;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.Optional;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.RESTUtil;
@@ -31,6 +32,9 @@ public class TableLikeEntity extends PolarisEntity {
   public static final String USER_SPECIFIED_WRITE_DATA_LOCATION_KEY = 
"write.data.path";
   public static final String USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY = 
"write.metadata.path";
 
+  public static final String LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY =
+      "last-notification-timestamp";
+
   public TableLikeEntity(PolarisBaseEntity sourceEntity) {
     super(sourceEntity);
   }
@@ -63,6 +67,13 @@ public class TableLikeEntity extends PolarisEntity {
     return getInternalPropertiesAsMap().get(METADATA_LOCATION_KEY);
   }
 
+  @JsonIgnore
+  public Optional<Long> getLastAdmittedNotificationTimestamp() {
+    return Optional.ofNullable(
+            
getInternalPropertiesAsMap().get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY))
+        .map(Long::parseLong);
+  }
+
   @JsonIgnore
   public String getBaseLocation() {
     return 
getPropertiesAsMap().get(PolarisEntityConstants.ENTITY_BASE_LOCATION);
@@ -109,5 +120,10 @@ public class TableLikeEntity extends PolarisEntity {
       internalProperties.put(METADATA_LOCATION_KEY, location);
       return this;
     }
+
+    public Builder setLastNotificationTimestamp(long timestamp) {
+      internalProperties.put(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY, 
String.valueOf(timestamp));
+      return this;
+    }
   }
 }
diff --git 
a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
 
b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index 2538caf5..848e1069 100644
--- 
a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++ 
b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -1874,10 +1874,23 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
                         .getMetaStoreManager()
                         .generateNewEntityId(getCurrentPolarisContext())
                         .getId())
+                
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
                 .build();
       } else {
+        // If the notification timestamp is out-of-order, we should not update 
the table
+        if (entity.getLastAdmittedNotificationTimestamp().isPresent()
+            && request.getPayload().getTimestamp()
+                <= entity.getLastAdmittedNotificationTimestamp().get()) {
+          throw new AlreadyExistsException(
+              "A notification with a newer timestamp has been processed for 
table %s",
+              tableIdentifier);
+        }
         existingLocation = entity.getMetadataLocation();
-        entity = new 
TableLikeEntity.Builder(entity).setMetadataLocation(newLocation).build();
+        entity =
+            new TableLikeEntity.Builder(entity)
+                .setMetadataLocation(newLocation)
+                
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
+                .build();
       }
       // first validate we can read the metadata file
       validateLocationForTableLike(tableIdentifier, newLocation);
diff --git 
a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
 
b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
index c574e5f4..1a69c7ba 100644
--- 
a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
+++ 
b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.BadRequestException;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
@@ -623,7 +624,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     update.setMetadataLocation(maliciousMetadataFile);
     update.setTableName(table.name());
     update.setTableUuid(UUID.randomUUID().toString());
-    update.setTimestamp(230950845L);
+    update.setTimestamp(230950849L);
     updateRequest.setPayload(update);
 
     Assertions.assertThatThrownBy(() -> catalog.sendNotification(table, 
updateRequest))
@@ -913,6 +914,71 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
         .hasMessageContaining("Invalid location");
   }
 
+  @Test
+  public void testUpdateNotificationRejectOutOfOrderTimestamp() {
+    Assumptions.assumeTrue(
+        requiresNamespaceCreate(),
+        "Only applicable if namespaces must be created before adding 
children");
+    Assumptions.assumeTrue(
+        supportsNestedNamespaces(), "Only applicable if nested namespaces are 
supported");
+    Assumptions.assumeTrue(
+        supportsNotifications(), "Only applicable if notifications are 
supported");
+
+    final String tableLocation = "s3://externally-owned-bucket/table/";
+    final String tableMetadataLocation = tableLocation + 
"metadata/v1.metadata.json";
+    BasePolarisCatalog catalog = catalog();
+
+    Namespace namespace = Namespace.of("parent", "child1");
+    TableIdentifier table = TableIdentifier.of(namespace, "table");
+
+    long timestamp = 230950845L;
+    NotificationRequest request = new NotificationRequest();
+    request.setNotificationType(NotificationType.CREATE);
+    TableUpdateNotification update = new TableUpdateNotification();
+    update.setMetadataLocation(tableMetadataLocation);
+    update.setTableName(table.name());
+    update.setTableUuid(UUID.randomUUID().toString());
+    update.setTimestamp(timestamp);
+    request.setPayload(update);
+
+    InMemoryFileIO fileIO = (InMemoryFileIO) catalog.getIo();
+
+    fileIO.addFile(
+        tableMetadataLocation,
+        
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
+
+    catalog.sendNotification(table, request);
+
+    // Send a notification with a timestamp same as that of the previous 
notification, should fail
+    NotificationRequest request2 = new NotificationRequest();
+    request2.setNotificationType(NotificationType.UPDATE);
+    TableUpdateNotification update2 = new TableUpdateNotification();
+    update2.setMetadataLocation(tableLocation + "metadata/v2.metadata.json");
+    update2.setTableName(table.name());
+    update2.setTableUuid(UUID.randomUUID().toString());
+    update2.setTimestamp(timestamp);
+    request2.setPayload(update2);
+
+    Assertions.assertThatThrownBy(() -> catalog.sendNotification(table, 
request2))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining(
+            "A notification with a newer timestamp has been processed for 
table parent.child1.table");
+
+    // Verify that DROP notification won't be rejected due to timestamp
+    NotificationRequest request3 = new NotificationRequest();
+    request3.setNotificationType(NotificationType.DROP);
+    TableUpdateNotification update3 = new TableUpdateNotification();
+    update3.setMetadataLocation(tableLocation + "metadata/v2.metadata.json");
+    update3.setTableName(table.name());
+    update3.setTableUuid(UUID.randomUUID().toString());
+    update3.setTimestamp(timestamp);
+    request3.setPayload(update3);
+
+    Assertions.assertThat(catalog.sendNotification(table, request3))
+        .as("Drop notification should not fail despite timestamp being 
outdated")
+        .isTrue();
+  }
+
   @Test
   public void 
testUpdateNotificationWhenTableExistsFileSpecifiesDisallowedLocation() {
     Assumptions.assumeTrue(
diff --git a/spec/rest-catalog-open-api.yaml b/spec/rest-catalog-open-api.yaml
index 9c7e61e7..6bea02ac 100644
--- a/spec/rest-catalog-open-api.yaml
+++ b/spec/rest-catalog-open-api.yaml
@@ -976,7 +976,14 @@ paths:
       summary: Sends a notification to the table
       operationId: sendNotification
       requestBody:
-        description: The request containing the notification to be sent
+        description:
+          The request containing the notification to be sent.
+        
+          For each table, Polaris will reject any notification where the 
timestamp in the request body
+          is older than or equal to the most recent time Polaris has already 
processed for the table.
+          The responsibility of ensuring the correct order of timestamps for a 
sequence of notifications 
+          lies with the caller of the API. This includes managing potential 
clock skew or inconsistencies 
+          when notifications are sent from multiple sources.
         content:
           application/json:
             schema:
@@ -1001,6 +1008,16 @@ paths:
               examples:
                 TableToLoadDoesNotExist:
                   $ref: '#/components/examples/NoSuchTableError'
+        409:
+          description:
+            Conflict - The timestamp of the received notification is older 
than or equal to the 
+            most recent timestamp Polaris has already processed for the 
specified table.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/IcebergErrorResponse'
+              example:
+                $ref: '#/components/examples/OutOfOrderNotificationError'
         419:
           $ref: '#/components/responses/AuthenticationTimeoutResponse'
         503:
@@ -3213,6 +3230,7 @@ components:
     NotificationRequest:
       required:
         - notification-type
+        - payload
       properties:
         notification-type:
           $ref: '#/components/schemas/NotificationType'
@@ -4130,6 +4148,18 @@ components:
         "updates": {"owner": "Raoul"}
       }
 
+    OutOfOrderNotificationError:
+      summary:
+        The timestamp of the received notification is older than or equal to 
the most recent timestamp 
+        Polaris has already processed for the specified table.
+      value: {
+        "error": {
+          "message": "A notification with a newer timestamp has been admitted 
for table",
+          "type": "AlreadyExistsException",
+          "code": 409
+        }
+      }
+
   securitySchemes:
     OAuth2:
       type: oauth2

Reply via email to