This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 96284a94 Notification API reject out of order notifications (#232)
96284a94 is described below
commit 96284a94bb734ab980b845cf01e49bbd314a37bf
Author: Hung-An (Alvin) Chen <[email protected]>
AuthorDate: Fri Sep 6 09:24:01 2024 -0500
Notification API reject out of order notifications (#232)
---
.../polaris/core/entity/TableLikeEntity.java | 16 +++++
.../service/catalog/BasePolarisCatalog.java | 15 ++++-
.../service/catalog/BasePolarisCatalogTest.java | 68 +++++++++++++++++++++-
spec/rest-catalog-open-api.yaml | 32 +++++++++-
4 files changed, 128 insertions(+), 3 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
b/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
index b5ce9bb4..968598b9 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java
@@ -19,6 +19,7 @@
package org.apache.polaris.core.entity;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.Optional;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;
@@ -31,6 +32,9 @@ public class TableLikeEntity extends PolarisEntity {
public static final String USER_SPECIFIED_WRITE_DATA_LOCATION_KEY =
"write.data.path";
public static final String USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY =
"write.metadata.path";
+ public static final String LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY =
+ "last-notification-timestamp";
+
public TableLikeEntity(PolarisBaseEntity sourceEntity) {
super(sourceEntity);
}
@@ -63,6 +67,13 @@ public class TableLikeEntity extends PolarisEntity {
return getInternalPropertiesAsMap().get(METADATA_LOCATION_KEY);
}
+ @JsonIgnore
+ public Optional<Long> getLastAdmittedNotificationTimestamp() {
+ return Optional.ofNullable(
+
getInternalPropertiesAsMap().get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY))
+ .map(Long::parseLong);
+ }
+
@JsonIgnore
public String getBaseLocation() {
return
getPropertiesAsMap().get(PolarisEntityConstants.ENTITY_BASE_LOCATION);
@@ -109,5 +120,10 @@ public class TableLikeEntity extends PolarisEntity {
internalProperties.put(METADATA_LOCATION_KEY, location);
return this;
}
+
+ public Builder setLastNotificationTimestamp(long timestamp) {
+ internalProperties.put(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY,
String.valueOf(timestamp));
+ return this;
+ }
}
}
diff --git
a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index 2538caf5..848e1069 100644
---
a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++
b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -1874,10 +1874,23 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
.getMetaStoreManager()
.generateNewEntityId(getCurrentPolarisContext())
.getId())
+
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
.build();
} else {
+ // If the notification timestamp is out-of-order, we should not update
the table
+ if (entity.getLastAdmittedNotificationTimestamp().isPresent()
+ && request.getPayload().getTimestamp()
+ <= entity.getLastAdmittedNotificationTimestamp().get()) {
+ throw new AlreadyExistsException(
+ "A notification with a newer timestamp has been processed for
table %s",
+ tableIdentifier);
+ }
existingLocation = entity.getMetadataLocation();
- entity = new
TableLikeEntity.Builder(entity).setMetadataLocation(newLocation).build();
+ entity =
+ new TableLikeEntity.Builder(entity)
+ .setMetadataLocation(newLocation)
+
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
+ .build();
}
// first validate we can read the metadata file
validateLocationForTableLike(tableIdentifier, newLocation);
diff --git
a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
index c574e5f4..1a69c7ba 100644
---
a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
+++
b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
@@ -623,7 +624,7 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
update.setMetadataLocation(maliciousMetadataFile);
update.setTableName(table.name());
update.setTableUuid(UUID.randomUUID().toString());
- update.setTimestamp(230950845L);
+ update.setTimestamp(230950849L);
updateRequest.setPayload(update);
Assertions.assertThatThrownBy(() -> catalog.sendNotification(table,
updateRequest))
@@ -913,6 +914,71 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
.hasMessageContaining("Invalid location");
}
+ @Test
+ public void testUpdateNotificationRejectOutOfOrderTimestamp() {
+ Assumptions.assumeTrue(
+ requiresNamespaceCreate(),
+ "Only applicable if namespaces must be created before adding
children");
+ Assumptions.assumeTrue(
+ supportsNestedNamespaces(), "Only applicable if nested namespaces are
supported");
+ Assumptions.assumeTrue(
+ supportsNotifications(), "Only applicable if notifications are
supported");
+
+ final String tableLocation = "s3://externally-owned-bucket/table/";
+ final String tableMetadataLocation = tableLocation +
"metadata/v1.metadata.json";
+ BasePolarisCatalog catalog = catalog();
+
+ Namespace namespace = Namespace.of("parent", "child1");
+ TableIdentifier table = TableIdentifier.of(namespace, "table");
+
+ long timestamp = 230950845L;
+ NotificationRequest request = new NotificationRequest();
+ request.setNotificationType(NotificationType.CREATE);
+ TableUpdateNotification update = new TableUpdateNotification();
+ update.setMetadataLocation(tableMetadataLocation);
+ update.setTableName(table.name());
+ update.setTableUuid(UUID.randomUUID().toString());
+ update.setTimestamp(timestamp);
+ request.setPayload(update);
+
+ InMemoryFileIO fileIO = (InMemoryFileIO) catalog.getIo();
+
+ fileIO.addFile(
+ tableMetadataLocation,
+
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
+
+ catalog.sendNotification(table, request);
+
+ // Send a notification with a timestamp same as that of the previous
notification, should fail
+ NotificationRequest request2 = new NotificationRequest();
+ request2.setNotificationType(NotificationType.UPDATE);
+ TableUpdateNotification update2 = new TableUpdateNotification();
+ update2.setMetadataLocation(tableLocation + "metadata/v2.metadata.json");
+ update2.setTableName(table.name());
+ update2.setTableUuid(UUID.randomUUID().toString());
+ update2.setTimestamp(timestamp);
+ request2.setPayload(update2);
+
+ Assertions.assertThatThrownBy(() -> catalog.sendNotification(table,
request2))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining(
+ "A notification with a newer timestamp has been processed for
table parent.child1.table");
+
+ // Verify that DROP notification won't be rejected due to timestamp
+ NotificationRequest request3 = new NotificationRequest();
+ request3.setNotificationType(NotificationType.DROP);
+ TableUpdateNotification update3 = new TableUpdateNotification();
+ update3.setMetadataLocation(tableLocation + "metadata/v2.metadata.json");
+ update3.setTableName(table.name());
+ update3.setTableUuid(UUID.randomUUID().toString());
+ update3.setTimestamp(timestamp);
+ request3.setPayload(update3);
+
+ Assertions.assertThat(catalog.sendNotification(table, request3))
+ .as("Drop notification should not fail despite timestamp being
outdated")
+ .isTrue();
+ }
+
@Test
public void
testUpdateNotificationWhenTableExistsFileSpecifiesDisallowedLocation() {
Assumptions.assumeTrue(
diff --git a/spec/rest-catalog-open-api.yaml b/spec/rest-catalog-open-api.yaml
index 9c7e61e7..6bea02ac 100644
--- a/spec/rest-catalog-open-api.yaml
+++ b/spec/rest-catalog-open-api.yaml
@@ -976,7 +976,14 @@ paths:
summary: Sends a notification to the table
operationId: sendNotification
requestBody:
- description: The request containing the notification to be sent
+ description:
+ The request containing the notification to be sent.
+
+ For each table, Polaris will reject any notification where the
timestamp in the request body
+ is older than or equal to the most recent time Polaris has already
processed for the table.
+ The responsibility of ensuring the correct order of timestamps for a
sequence of notifications
+ lies with the caller of the API. This includes managing potential
clock skew or inconsistencies
+ when notifications are sent from multiple sources.
content:
application/json:
schema:
@@ -1001,6 +1008,16 @@ paths:
examples:
TableToLoadDoesNotExist:
$ref: '#/components/examples/NoSuchTableError'
+ 409:
+ description:
+ Conflict - The timestamp of the received notification is older
than or equal to the
+ most recent timestamp Polaris has already processed for the
specified table.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/IcebergErrorResponse'
+ example:
+ $ref: '#/components/examples/OutOfOrderNotificationError'
419:
$ref: '#/components/responses/AuthenticationTimeoutResponse'
503:
@@ -3213,6 +3230,7 @@ components:
NotificationRequest:
required:
- notification-type
+ - payload
properties:
notification-type:
$ref: '#/components/schemas/NotificationType'
@@ -4130,6 +4148,18 @@ components:
"updates": {"owner": "Raoul"}
}
+ OutOfOrderNotificationError:
+ summary:
+ The timestamp of the received notification is older than or equal to
the most recent timestamp
+ Polaris has already processed for the specified table.
+ value: {
+ "error": {
+ "message": "A notification with a newer timestamp has been admitted
for table",
+ "type": "AlreadyExistsException",
+ "code": 409
+ }
+ }
+
securitySchemes:
OAuth2:
type: oauth2