This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new f2e384826 Fix a race condition in sendNotification where concurrent
parent-namespace creation causes failures (#2693)
f2e384826 is described below
commit f2e384826c18829c868c722cf25639508b64ae3e
Author: Dennis Huo <[email protected]>
AuthorDate: Fri Sep 26 17:10:05 2025 -0700
Fix a race condition in sendNotification where concurrent parent-namespace
creation causes failures (#2693)
* Fix a race condition in sendNotification where concurrent
parent-namespace creation causes failures
The semantics of the createNonExistingNamespaces method used during
sendNotification were supposed
to be "create if needed". However, the behavior ended up surfacing an
AlreadyExistsException
if multiple concurrent sendNotification attempts were made for a brand-new
namespace (where
the notifications may be different tables). This would cause a table sync
to fail if a sibling
table was being synced at the same time, even though the new table should
successfully get created
under the shared namespace.
* Also better future-proof the createNamespaceInternal logic by explicitly
checking for ENTITY_ALREADY_EXISTS, per review suggestion.
Log a less scary message since it's not an error scenario type of race
condition, per review suggestion
---
.../service/catalog/iceberg/IcebergCatalog.java | 44 ++++++++++----
.../iceberg/AbstractIcebergCatalogTest.java | 68 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 11 deletions(-)
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index ac5d86524..8d237fb9e 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -71,6 +71,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.ServiceFailureException;
import org.apache.iceberg.exceptions.UnprocessableEntityException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
@@ -508,16 +509,21 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
BehaviorChangeConfiguration.ALLOW_NAMESPACE_CUSTOM_LOCATION,
catalogEntity)) {
validateNamespaceLocation(entity, resolvedParent);
}
- PolarisEntity returnedEntity =
- PolarisEntity.of(
- getMetaStoreManager()
- .createEntityIfNotExists(
- getCurrentPolarisContext(),
- PolarisEntity.toCoreList(resolvedParent.getRawFullPath()),
- entity));
- if (returnedEntity == null) {
- throw new AlreadyExistsException(
- "Cannot create namespace %s. Namespace already exists", namespace);
+ EntityResult result =
+ getMetaStoreManager()
+ .createEntityIfNotExists(
+ getCurrentPolarisContext(),
+ PolarisEntity.toCoreList(resolvedParent.getRawFullPath()),
+ entity);
+ if (!result.isSuccess()) {
+ if (result.alreadyExists()) {
+ throw new AlreadyExistsException(
+ "Cannot create namespace %s. Namespace already exists", namespace);
+ } else {
+ throw new ServiceFailureException(
+ "Unexpected error trying to create namespace %s. Status: %s
ExtraInfo: %s",
+ namespace, result.getReturnStatus(), result.getExtraInformation());
+ }
}
}
@@ -2579,7 +2585,23 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
Namespace parentNamespace =
PolarisCatalogHelpers.getParentNamespace(nsLevel);
PolarisResolvedPathWrapper resolvedParent =
resolvedEntityView.getPassthroughResolvedPath(parentNamespace);
- createNamespaceInternal(nsLevel, Collections.emptyMap(),
resolvedParent);
+ try {
+ createNamespaceInternal(nsLevel, Collections.emptyMap(),
resolvedParent);
+ } catch (AlreadyExistsException aee) {
+ // Since we only attempted to create the namespace after checking
that
+ // getPassthroughResolvedPath for this level is null, this should be
a relatively
+ // infrequent case during high concurrency where another
notification already
+ // conveniently created the namespace between the time we checked
and the time
+ // we attempted to fill it in. It's working as intended in this case
to simply
+ // continue with the existing namespace, but the fact that this
collision occurred
+ // may be relevant to someone running the service in case of
unexpected interactions,
+ // so we'll still log the fact that this happened.
+ LOGGER
+ .atInfo()
+ .setCause(aee)
+ .addKeyValue("namespace", namespace)
+ .log("Namespace already exists in createNonExistingNamespace");
+ }
}
}
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
index 3d66c4858..1c9a43898 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -105,6 +106,7 @@ import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.exceptions.CommitConflictException;
@@ -1059,6 +1061,72 @@ public abstract class AbstractIcebergCatalogTest extends
CatalogTests<IcebergCat
.isTrue();
}
+ @Test
+ public void
testUpdateNotificationWhenTableAndNamespacesDontExistNamespaceRaceCondition() {
+ Assumptions.assumeTrue(
+ requiresNamespaceCreate(),
+ "Only applicable if namespaces must be created before adding
children");
+ Assumptions.assumeTrue(
+ supportsNestedNamespaces(), "Only applicable if nested namespaces are
supported");
+ Assumptions.assumeTrue(
+ supportsNotifications(), "Only applicable if notifications are
supported");
+
+ final String tableLocation = "s3://externally-owned-bucket/table/";
+ final String tableMetadataLocation = tableLocation +
"metadata/v1.metadata.json";
+
+ // Use a spy so we can inject a concurrency error
+ PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager);
+ IcebergCatalog catalog = newIcebergCatalog(CATALOG_NAME, spyMetaStore);
+ catalog.initialize(
+ CATALOG_NAME,
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+
+ Namespace namespace = Namespace.of("parent", "child1");
+ TableIdentifier table = TableIdentifier.of(namespace, "table");
+
+ NotificationRequest request = new NotificationRequest();
+ request.setNotificationType(NotificationType.UPDATE);
+ TableUpdateNotification update = new TableUpdateNotification();
+ update.setMetadataLocation(tableMetadataLocation);
+ update.setTableName(table.name());
+ update.setTableUuid(UUID.randomUUID().toString());
+ update.setTimestamp(230950845L);
+ request.setPayload(update);
+
+ fileIO.addFile(
+ tableMetadataLocation,
+
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
+
+ // Always invoke the real underlying method, but if it's a namespace we'll
return
+ // ENTITY_ALREADY_EXISTS after doing so to simulate a different concurrent
caller having
+ // been the one to succeed creating the namespace first.
+ doAnswer(
+ invocation -> {
+ PolarisEntity entity = (PolarisEntity) invocation.getArgument(2);
+ EntityResult result = (EntityResult) invocation.callRealMethod();
+ if (entity.getType() == PolarisEntityType.NAMESPACE) {
+ return new EntityResult(
+ BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS,
+ PolarisEntitySubType.NULL_SUBTYPE.getCode());
+ } else {
+ return result;
+ }
+ })
+ .when(spyMetaStore)
+ .createEntityIfNotExists(any(), any(), any());
+
+ Assertions.assertThat(catalog.sendNotification(table, request))
+ .as("Notification should be sent successfully")
+ .isTrue();
+ Assertions.assertThat(catalog.namespaceExists(namespace))
+ .as("Intermediate namespaces should be created")
+ .isTrue();
+ Assertions.assertThat(catalog.tableExists(table))
+ .as("Table should be created on receiving notification")
+ .isTrue();
+ }
+
@Test
public void testUpdateNotificationCreateTableInDisallowedLocation() {
Assumptions.assumeTrue(