This is an automated email from the ASF dual-hosted git repository.

dhuo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 50c8c3c23 Fix a bug in loadTasks which wasn't abiding by 
compare-and-swap semantics when updating TaskEntities. (#1134)
50c8c3c23 is described below

commit 50c8c3c23020f4bf9a31dc7fab610add91ff3e56
Author: Dennis Huo <[email protected]>
AuthorDate: Mon Mar 10 14:54:37 2025 -0700

    Fix a bug in loadTasks which wasn't abiding by compare-and-swap semantics 
when updating TaskEntities. (#1134)
    
    Additionally, fix the PolarisTreeMapMetaStoreSessionImpl to avoid 
"cheating" by
    using its entire PolarisBaseEntit it happens to store in entitiesActive when
    performing a listing; in general, entitiesActive is assumed to *only* 
contain
    the name lookup records, so it is not explicitly updated unless a parent or
    name changes.
    
    The existing tests happened to pass because of a coincidence of several 
bugs:
    
    1. Since the entitiesActive and entities happens to share the same 
PolarisBaseEntity
    instance by-reference, any in-place edits to "entities" *happens* to reflect
    in entitiesActive, but in general entities *should not be mutable* and 
usually
    a copy of an entity is mutated instead of mutating in place anyways
    2. Because of the blind-writes in loadTasks, the "prepare entity" step
    where an instance is updated with a new entityVersion coincidentally 
immediately
    causes the main entities *and* entitiesAcitve slices to have the updated 
entity
    even before the call to writeEntity happens
    3. Since TreeMap uses "synchronized" for runInTransaction, the 
loadTasksInParallel
    doesn't detect interleaving of threads
    
    So without the fix in the treemap store, simply fixing 
PolarisMetaStoreManagerImpl
    causes the test to run forever because tasks never get updated in 
entitiesActive
    so the same initial 5 tasks keep getting listed forever.
    
    Also fix the test to actually timeout eventually; the Future::get calls were
    willing to block forever before getting to the await. Reduce the wait time 
from
    10 minutes to something more reasonable.
---
 .../transactional/PolarisMetaStoreManagerImpl.java | 24 ++++++++++++++++-----
 .../PolarisTreeMapMetaStoreSessionImpl.java        |  3 +++
 .../BasePolarisMetaStoreManagerTest.java           | 25 +++++++++++-----------
 3 files changed, 34 insertions(+), 18 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
index 5736af3c4..da09a6799 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
@@ -1996,9 +1996,12 @@ public class PolarisMetaStoreManagerImpl extends 
BaseMetaStoreManager {
             },
             Function.identity());
 
+    List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
     availableTasks.forEach(
         task -> {
-          PolarisBaseEntity originalTask = new PolarisBaseEntity(task);
+          // Make a copy to avoid mutating someone else's reference.
+          // TODO: Refactor into immutable/Builder pattern.
+          PolarisBaseEntity updatedTask = new PolarisBaseEntity(task);
           Map<String, String> properties =
               PolarisObjectMapperUtil.deserializeProperties(callCtx, 
task.getProperties());
           properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, 
executorId);
@@ -2010,11 +2013,22 @@ public class PolarisMetaStoreManagerImpl extends 
BaseMetaStoreManager {
               String.valueOf(
                   
Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, 
"0"))
                       + 1));
-          task.setEntityVersion(task.getEntityVersion() + 1);
-          
task.setProperties(PolarisObjectMapperUtil.serializeProperties(callCtx, 
properties));
-          ms.writeEntity(callCtx, task, false, originalTask);
+          updatedTask.setProperties(
+              PolarisObjectMapperUtil.serializeProperties(callCtx, 
properties));
+          EntityResult result = updateEntityPropertiesIfNotChanged(callCtx, 
ms, null, updatedTask);
+          if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
+            loadedTasks.add(result.getEntity());
+          } else {
+            // TODO: Consider performing incremental leasing of individual 
tasks one at a time
+            // instead of requiring all-or-none semantics for all the tasks we 
think we listed,
+            // or else contention could be very bad.
+            ms.rollback();
+            throw new RetryOnConcurrencyException(
+                "Failed to lease available task with status %s, info: %s",
+                result.getReturnStatus(), result.getExtraInformation());
+          }
         });
-    return new EntitiesResult(availableTasks);
+    return new EntitiesResult(loadedTasks);
   }
 
   @Override
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
index 9894b82e6..181aee2b0 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
@@ -341,6 +341,9 @@ public class PolarisTreeMapMetaStoreSessionImpl extends 
AbstractTransactionalPer
         .getSliceEntitiesActive()
         .readRange(this.store.buildPrefixKeyComposite(catalogId, parentId, 
entityType.getCode()))
         .stream()
+        .map(
+            nameRecord ->
+                this.lookupEntity(callCtx, catalogId, nameRecord.getId(), 
entityType.getCode()))
         .filter(entityFilter)
         .limit(limit)
         .map(transformer)
diff --git 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
index 34b325160..81f233825 100644
--- 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
+++ 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
@@ -375,7 +375,6 @@ public abstract class BasePolarisMetaStoreManagerTest {
     PolarisMetaStoreManager metaStoreManager = 
polarisTestMetaStoreManager.polarisMetaStoreManager;
     PolarisCallContext callCtx = 
polarisTestMetaStoreManager.polarisCallContext;
     List<Future<Set<String>>> futureList = new ArrayList<>();
-    List<Set<String>> responses;
     ExecutorService executorService = Executors.newCachedThreadPool();
     try {
       for (int i = 0; i < 3; i++) {
@@ -399,21 +398,21 @@ public abstract class BasePolarisMetaStoreManagerTest {
                   return taskNames;
                 }));
       }
-      responses =
-          futureList.stream()
-              .map(
-                  f -> {
-                    try {
-                      return f.get();
-                    } catch (Exception e) {
-                      throw new RuntimeException(e);
-                    }
-                  })
-              .collect(Collectors.toList());
     } finally {
       executorService.shutdown();
-      Assertions.assertThat(executorService.awaitTermination(10, 
TimeUnit.MINUTES)).isTrue();
+      Assertions.assertThat(executorService.awaitTermination(30, 
TimeUnit.SECONDS)).isTrue();
     }
+    List<Set<String>> responses =
+        futureList.stream()
+            .map(
+                f -> {
+                  try {
+                    return f.get(30, TimeUnit.SECONDS);
+                  } catch (Exception e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
     Assertions.assertThat(responses)
         .hasSize(3)
         .satisfies(l -> 
Assertions.assertThat(l.stream().flatMap(Set::stream)).hasSize(100));

Reply via email to