This is an automated email from the ASF dual-hosted git repository.
dhuo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 50c8c3c23 Fix a bug in loadTasks which wasn't abiding by
compare-and-swap semantics when updating TaskEntities. (#1134)
50c8c3c23 is described below
commit 50c8c3c23020f4bf9a31dc7fab610add91ff3e56
Author: Dennis Huo <[email protected]>
AuthorDate: Mon Mar 10 14:54:37 2025 -0700
Fix a bug in loadTasks which wasn't abiding by compare-and-swap semantics
when updating TaskEntities. (#1134)
Additionally, fix the PolarisTreeMapMetaStoreSessionImpl to avoid
"cheating" by
using its entire PolarisBaseEntit it happens to store in entitiesActive when
performing a listing; in general, entitiesActive is assumed to *only*
contain
the name lookup records, so it is not explicitly updated unless a parent or
name changes.
The existing tests happened to pass because of a coincidence of several
bugs:
1. Since the entitiesActive and entities happens to share the same
PolarisBaseEntity
instance by-reference, any in-place edits to "entities" *happens* to reflect
in entitiesActive, but in general entities *should not be mutable* and
usually
a copy of an entity is mutated instead of mutating in place anyways
2. Because of the blind-writes in loadTasks, the "prepare entity" step
where an instance is updated with a new entityVersion coincidentally
immediately
causes the main entities *and* entitiesAcitve slices to have the updated
entity
even before the call to writeEntity happens
3. Since TreeMap uses "synchronized" for runInTransaction, the
loadTasksInParallel
doesn't detect interleaving of threads
So without the fix in the treemap store, simply fixing
PolarisMetaStoreManagerImpl
causes the test to run forever because tasks never get updated in
entitiesActive
so the same initial 5 tasks keep getting listed forever.
Also fix the test to actually timeout eventually; the Future::get calls were
willing to block forever before getting to the await. Reduce the wait time
from
10 minutes to something more reasonable.
---
.../transactional/PolarisMetaStoreManagerImpl.java | 24 ++++++++++++++++-----
.../PolarisTreeMapMetaStoreSessionImpl.java | 3 +++
.../BasePolarisMetaStoreManagerTest.java | 25 +++++++++++-----------
3 files changed, 34 insertions(+), 18 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
index 5736af3c4..da09a6799 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisMetaStoreManagerImpl.java
@@ -1996,9 +1996,12 @@ public class PolarisMetaStoreManagerImpl extends
BaseMetaStoreManager {
},
Function.identity());
+ List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
availableTasks.forEach(
task -> {
- PolarisBaseEntity originalTask = new PolarisBaseEntity(task);
+ // Make a copy to avoid mutating someone else's reference.
+ // TODO: Refactor into immutable/Builder pattern.
+ PolarisBaseEntity updatedTask = new PolarisBaseEntity(task);
Map<String, String> properties =
PolarisObjectMapperUtil.deserializeProperties(callCtx,
task.getProperties());
properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID,
executorId);
@@ -2010,11 +2013,22 @@ public class PolarisMetaStoreManagerImpl extends
BaseMetaStoreManager {
String.valueOf(
Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT,
"0"))
+ 1));
- task.setEntityVersion(task.getEntityVersion() + 1);
-
task.setProperties(PolarisObjectMapperUtil.serializeProperties(callCtx,
properties));
- ms.writeEntity(callCtx, task, false, originalTask);
+ updatedTask.setProperties(
+ PolarisObjectMapperUtil.serializeProperties(callCtx,
properties));
+ EntityResult result = updateEntityPropertiesIfNotChanged(callCtx,
ms, null, updatedTask);
+ if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
+ loadedTasks.add(result.getEntity());
+ } else {
+ // TODO: Consider performing incremental leasing of individual
tasks one at a time
+ // instead of requiring all-or-none semantics for all the tasks we
think we listed,
+ // or else contention could be very bad.
+ ms.rollback();
+ throw new RetryOnConcurrencyException(
+ "Failed to lease available task with status %s, info: %s",
+ result.getReturnStatus(), result.getExtraInformation());
+ }
});
- return new EntitiesResult(availableTasks);
+ return new EntitiesResult(loadedTasks);
}
@Override
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
index 9894b82e6..181aee2b0 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/PolarisTreeMapMetaStoreSessionImpl.java
@@ -341,6 +341,9 @@ public class PolarisTreeMapMetaStoreSessionImpl extends
AbstractTransactionalPer
.getSliceEntitiesActive()
.readRange(this.store.buildPrefixKeyComposite(catalogId, parentId,
entityType.getCode()))
.stream()
+ .map(
+ nameRecord ->
+ this.lookupEntity(callCtx, catalogId, nameRecord.getId(),
entityType.getCode()))
.filter(entityFilter)
.limit(limit)
.map(transformer)
diff --git
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
index 34b325160..81f233825 100644
---
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
+++
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
@@ -375,7 +375,6 @@ public abstract class BasePolarisMetaStoreManagerTest {
PolarisMetaStoreManager metaStoreManager =
polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx =
polarisTestMetaStoreManager.polarisCallContext;
List<Future<Set<String>>> futureList = new ArrayList<>();
- List<Set<String>> responses;
ExecutorService executorService = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 3; i++) {
@@ -399,21 +398,21 @@ public abstract class BasePolarisMetaStoreManagerTest {
return taskNames;
}));
}
- responses =
- futureList.stream()
- .map(
- f -> {
- try {
- return f.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- })
- .collect(Collectors.toList());
} finally {
executorService.shutdown();
- Assertions.assertThat(executorService.awaitTermination(10,
TimeUnit.MINUTES)).isTrue();
+ Assertions.assertThat(executorService.awaitTermination(30,
TimeUnit.SECONDS)).isTrue();
}
+ List<Set<String>> responses =
+ futureList.stream()
+ .map(
+ f -> {
+ try {
+ return f.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
Assertions.assertThat(responses)
.hasSize(3)
.satisfies(l ->
Assertions.assertThat(l.stream().flatMap(Set::stream)).hasSize(100));