This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new f2e384826 Fix a race condition in sendNotification where concurrent 
parent-namespace creation causes failures (#2693)
f2e384826 is described below

commit f2e384826c18829c868c722cf25639508b64ae3e
Author: Dennis Huo <[email protected]>
AuthorDate: Fri Sep 26 17:10:05 2025 -0700

    Fix a race condition in sendNotification where concurrent parent-namespace 
creation causes failures (#2693)
    
    * Fix a race condition in sendNotification where concurrent 
parent-namespace creation causes failures
    
    The semantics of the createNonExistingNamespaces method used during 
sendNotification were supposed
    to be "create if needed". However, the behavior ended up surfacing an 
AlreadyExistsException
    if multiple concurrent sendNotification attempts were made for a brand-new 
namespace (where
    the notifications may be different tables). This would cause a table sync 
to fail if a sibling
    table was being synced at the same time, even though the new table should 
successfully get created
    under the shared namespace.
    
    * Also better future-proof the createNamespaceInternal logic by explicitly
    checking for ENTITY_ALREADY_EXISTS, per review suggestion.
    
    Log a less scary message since it's not an error scenario type of race
    condition, per review suggestion
---
 .../service/catalog/iceberg/IcebergCatalog.java    | 44 ++++++++++----
 .../iceberg/AbstractIcebergCatalogTest.java        | 68 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 11 deletions(-)

diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index ac5d86524..8d237fb9e 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -71,6 +71,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NoSuchViewException;
 import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.ServiceFailureException;
 import org.apache.iceberg.exceptions.UnprocessableEntityException;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
@@ -508,16 +509,21 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
         BehaviorChangeConfiguration.ALLOW_NAMESPACE_CUSTOM_LOCATION, 
catalogEntity)) {
       validateNamespaceLocation(entity, resolvedParent);
     }
-    PolarisEntity returnedEntity =
-        PolarisEntity.of(
-            getMetaStoreManager()
-                .createEntityIfNotExists(
-                    getCurrentPolarisContext(),
-                    PolarisEntity.toCoreList(resolvedParent.getRawFullPath()),
-                    entity));
-    if (returnedEntity == null) {
-      throw new AlreadyExistsException(
-          "Cannot create namespace %s. Namespace already exists", namespace);
+    EntityResult result =
+        getMetaStoreManager()
+            .createEntityIfNotExists(
+                getCurrentPolarisContext(),
+                PolarisEntity.toCoreList(resolvedParent.getRawFullPath()),
+                entity);
+    if (!result.isSuccess()) {
+      if (result.alreadyExists()) {
+        throw new AlreadyExistsException(
+            "Cannot create namespace %s. Namespace already exists", namespace);
+      } else {
+        throw new ServiceFailureException(
+            "Unexpected error trying to create namespace %s. Status: %s 
ExtraInfo: %s",
+            namespace, result.getReturnStatus(), result.getExtraInformation());
+      }
     }
   }
 
@@ -2579,7 +2585,23 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
         Namespace parentNamespace = 
PolarisCatalogHelpers.getParentNamespace(nsLevel);
         PolarisResolvedPathWrapper resolvedParent =
             resolvedEntityView.getPassthroughResolvedPath(parentNamespace);
-        createNamespaceInternal(nsLevel, Collections.emptyMap(), 
resolvedParent);
+        try {
+          createNamespaceInternal(nsLevel, Collections.emptyMap(), 
resolvedParent);
+        } catch (AlreadyExistsException aee) {
+          // Since we only attempted to create the namespace after checking 
that
+          // getPassthroughResolvedPath for this level is null, this should be 
a relatively
+          // infrequent case during high concurrency where another 
notification already
+          // conveniently created the namespace between the time we checked 
and the time
+          // we attempted to fill it in. It's working as intended in this case 
to simply
+          // continue with the existing namespace, but the fact that this 
collision occurred
+          // may be relevant to someone running the service in case of 
unexpected interactions,
+          // so we'll still log the fact that this happened.
+          LOGGER
+              .atInfo()
+              .setCause(aee)
+              .addKeyValue("namespace", namespace)
+              .log("Namespace already exists in createNonExistingNamespace");
+        }
       }
     }
   }
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
index 3d66c4858..1c9a43898 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -105,6 +106,7 @@ import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.PrincipalEntity;
 import org.apache.polaris.core.entity.TaskEntity;
 import org.apache.polaris.core.exceptions.CommitConflictException;
@@ -1059,6 +1061,72 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
         .isTrue();
   }
 
+  @Test
+  public void 
testUpdateNotificationWhenTableAndNamespacesDontExistNamespaceRaceCondition() {
+    Assumptions.assumeTrue(
+        requiresNamespaceCreate(),
+        "Only applicable if namespaces must be created before adding 
children");
+    Assumptions.assumeTrue(
+        supportsNestedNamespaces(), "Only applicable if nested namespaces are 
supported");
+    Assumptions.assumeTrue(
+        supportsNotifications(), "Only applicable if notifications are 
supported");
+
+    final String tableLocation = "s3://externally-owned-bucket/table/";
+    final String tableMetadataLocation = tableLocation + 
"metadata/v1.metadata.json";
+
+    // Use a spy so we can inject a concurrency error
+    PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager);
+    IcebergCatalog catalog = newIcebergCatalog(CATALOG_NAME, spyMetaStore);
+    catalog.initialize(
+        CATALOG_NAME,
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+    Namespace namespace = Namespace.of("parent", "child1");
+    TableIdentifier table = TableIdentifier.of(namespace, "table");
+
+    NotificationRequest request = new NotificationRequest();
+    request.setNotificationType(NotificationType.UPDATE);
+    TableUpdateNotification update = new TableUpdateNotification();
+    update.setMetadataLocation(tableMetadataLocation);
+    update.setTableName(table.name());
+    update.setTableUuid(UUID.randomUUID().toString());
+    update.setTimestamp(230950845L);
+    request.setPayload(update);
+
+    fileIO.addFile(
+        tableMetadataLocation,
+        
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
+
+    // Always invoke the real underlying method, but if it's a namespace we'll 
return
+    // ENTITY_ALREADY_EXISTS after doing so to simulate a different concurrent 
caller having
+    // been the one to succeed creating the namespace first.
+    doAnswer(
+            invocation -> {
+              PolarisEntity entity = (PolarisEntity) invocation.getArgument(2);
+              EntityResult result = (EntityResult) invocation.callRealMethod();
+              if (entity.getType() == PolarisEntityType.NAMESPACE) {
+                return new EntityResult(
+                    BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS,
+                    PolarisEntitySubType.NULL_SUBTYPE.getCode());
+              } else {
+                return result;
+              }
+            })
+        .when(spyMetaStore)
+        .createEntityIfNotExists(any(), any(), any());
+
+    Assertions.assertThat(catalog.sendNotification(table, request))
+        .as("Notification should be sent successfully")
+        .isTrue();
+    Assertions.assertThat(catalog.namespaceExists(namespace))
+        .as("Intermediate namespaces should be created")
+        .isTrue();
+    Assertions.assertThat(catalog.tableExists(table))
+        .as("Table should be created on receiving notification")
+        .isTrue();
+  }
+
   @Test
   public void testUpdateNotificationCreateTableInDisallowedLocation() {
     Assumptions.assumeTrue(

Reply via email to