This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new a29f8006f Fix TableIdentifier in TaskFileIOSupplier (#2304)
a29f8006f is described below

commit a29f8006fe9d259df755d02ec6386c2bd6932610
Author: Christopher Lambert <xn...@gmx.de>
AuthorDate: Fri Aug 22 21:54:42 2025 +0200

    Fix TableIdentifier in TaskFileIOSupplier (#2304)
    
    we cant just convert a `TaskEntity` to a `IcebergTableLikeEntity` as the
    `getTableIdentifier()` method will not return a correct value by using
    the name of the task and its parent namespace (which is empty?).
    
    task handlers instead need to pass in the `TableIdentifier` that they
    already inferred via `TaskEntity.readData`.
---
 .../service/task/BatchFileCleanupTaskHandler.java  |  2 +-
 .../task/ManifestFileCleanupTaskHandler.java       |  2 +-
 .../service/task/TableCleanupTaskHandler.java      | 25 ++++++++++++----------
 .../polaris/service/task/TaskFileIOSupplier.java   | 10 ++++-----
 .../catalog/AbstractIcebergCatalogTest.java        |  2 +-
 .../service/catalog/io/FileIOFactoryTest.java      |  2 +-
 6 files changed, 22 insertions(+), 21 deletions(-)

diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
index d47725351..fdb4ef5e0 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java
@@ -53,7 +53,7 @@ public class BatchFileCleanupTaskHandler extends 
FileCleanupTaskHandler {
     BatchFileCleanupTask cleanupTask = 
task.readData(BatchFileCleanupTask.class);
     TableIdentifier tableId = cleanupTask.tableId();
     List<String> batchFiles = cleanupTask.batchFiles();
-    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
+    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, 
callContext)) {
       List<String> validFiles =
           batchFiles.stream().filter(file -> TaskUtils.exists(file, 
authorizedFileIO)).toList();
       if (validFiles.isEmpty()) {
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
index 3173dc25e..59e9be8dd 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java
@@ -63,7 +63,7 @@ public class ManifestFileCleanupTaskHandler extends 
FileCleanupTaskHandler {
   public boolean handleTask(TaskEntity task, CallContext callContext) {
     ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
     TableIdentifier tableId = cleanupTask.tableId();
-    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
+    try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, 
callContext)) {
       ManifestFile manifestFile = 
decodeManifestData(cleanupTask.manifestFileData());
       return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
     }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
index 20bce48d4..db8b335ba 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
@@ -21,6 +21,7 @@ package org.apache.polaris.service.task;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -36,7 +37,6 @@ import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.AsyncTaskType;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
-import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.TaskEntity;
 import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
@@ -71,21 +71,19 @@ public class TableCleanupTaskHandler implements TaskHandler 
{
 
   @Override
   public boolean canHandleTask(TaskEntity task) {
-    return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER && 
taskEntityIsTable(task);
+    return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER
+        && tryGetTableEntity(task).isPresent();
   }
 
-  private boolean taskEntityIsTable(TaskEntity task) {
-    PolarisEntity entity = 
PolarisEntity.of((task.readData(PolarisBaseEntity.class)));
-    return entity.getType().equals(PolarisEntityType.TABLE_LIKE);
+  private Optional<IcebergTableLikeEntity> tryGetTableEntity(TaskEntity task) {
+    return Optional.ofNullable(task.readData(PolarisBaseEntity.class))
+        .filter(entity -> 
entity.getType().equals(PolarisEntityType.TABLE_LIKE))
+        .map(IcebergTableLikeEntity::of);
   }
 
   @Override
   public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
-    PolarisBaseEntity entity = cleanupTask.readData(PolarisBaseEntity.class);
-    PolarisMetaStoreManager metaStoreManager =
-        
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
-    IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(entity);
-    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    IcebergTableLikeEntity tableEntity = 
tryGetTableEntity(cleanupTask).orElseThrow();
     LOGGER
         .atInfo()
         .addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
@@ -95,7 +93,8 @@ public class TableCleanupTaskHandler implements TaskHandler {
     // It's likely the cleanupTask has already been completed, but wasn't 
dropped successfully.
     // Log a
     // warning and move on
-    try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) {
+    try (FileIO fileIO =
+        fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier(), 
callContext)) {
       if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) {
         LOGGER
             .atWarn()
@@ -108,6 +107,10 @@ public class TableCleanupTaskHandler implements 
TaskHandler {
       TableMetadata tableMetadata =
           TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation());
 
+      PolarisMetaStoreManager metaStoreManager =
+          
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
+      PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+
       Stream<TaskEntity> manifestCleanupTasks =
           getManifestTaskStream(
               cleanupTask,
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
index e3a1ddd48..44de36210 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java
@@ -24,21 +24,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
+import org.apache.commons.lang3.function.TriFunction;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.FileIO;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.PolarisTaskConstants;
 import org.apache.polaris.core.entity.TaskEntity;
-import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
 import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
 import org.apache.polaris.core.storage.PolarisStorageActions;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 
 @ApplicationScoped
-public class TaskFileIOSupplier implements BiFunction<TaskEntity, CallContext, 
FileIO> {
+public class TaskFileIOSupplier
+    implements TriFunction<TaskEntity, TableIdentifier, CallContext, FileIO> {
   private final FileIOFactory fileIOFactory;
 
   @Inject
@@ -47,12 +47,10 @@ public class TaskFileIOSupplier implements 
BiFunction<TaskEntity, CallContext, F
   }
 
   @Override
-  public FileIO apply(TaskEntity task, CallContext callContext) {
+  public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext 
callContext) {
     Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
     Map<String, String> properties = new HashMap<>(internalProperties);
 
-    IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(task);
-    TableIdentifier identifier = tableEntity.getTableIdentifier();
     String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION);
     Set<String> locations = Set.of(location);
     Set<PolarisStorageActions> storageActions = 
Set.of(PolarisStorageActions.ALL);
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
index d10f82057..696bca432 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java
@@ -1840,7 +1840,7 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
     FileIO fileIO =
         new TaskFileIOSupplier(
                 new DefaultFileIOFactory(storageCredentialCache, 
metaStoreManagerFactory))
-            .apply(taskEntity, polarisContext);
+            .apply(taskEntity, TABLE, polarisContext);
     
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
     Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
         .isInstanceOf(InMemoryFileIO.class);
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index fd6e79f12..5c72f1506 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -176,7 +176,7 @@ public class FileIOFactoryTest {
     Assertions.assertThat(tasks).hasSize(1);
     TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
     FileIO fileIO =
-        new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, 
callContext);
+        new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, 
TABLE, callContext);
     
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
     Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
         .isInstanceOf(InMemoryFileIO.class);

Reply via email to