This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 5f938585e [AMORO-3265] Refactor SimpleFuture for process api (#3266)
5f938585e is described below
commit 5f938585e3495e670b1a9ae3e9b330d4519c0f2c
Author: majin1102 <[email protected]>
AuthorDate: Thu Oct 17 15:33:19 2024 +0800
[AMORO-3265] Refactor SimpleFuture for process api (#3266)
Co-authored-by: majin.nathan <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 2 +-
.../amoro/server/DefaultOptimizingService.java | 10 +-
.../apache/amoro/server/RestCatalogService.java | 2 +-
.../amoro/server/catalog/InternalCatalog.java | 6 +-
.../amoro/server/catalog/InternalCatalogImpl.java | 2 +-
.../apache/amoro/server/catalog/ServerCatalog.java | 2 +-
.../amoro/server/dashboard/DashboardServer.java | 4 +-
.../amoro/server/dashboard/utils/CommonUtil.java | 2 +-
.../server/manager/AbstractPluginManager.java | 4 +-
.../amoro/server/optimizing/KeyedTableCommit.java | 2 +-
.../amoro/server/optimizing/OptimizingQueue.java | 2 +-
.../amoro/server/optimizing/TaskRuntime.java | 8 +-
.../server/optimizing/UnKeyedTableCommit.java | 2 +-
.../amoro/server/persistence/PersistentBase.java | 4 +-
.../amoro/server/table/DefaultTableService.java | 10 +-
.../amoro/server/utils/PreconditionUtils.java | 4 +-
.../amoro/server/TestDefaultOptimizingService.java | 4 +-
.../server/manager/TestAbstractPluginManager.java | 4 +-
.../optimizing/commit/TestMixIcebergCommit.java | 2 +-
.../optimizing/commit/TestUnKeyedTableCommit.java | 2 +-
.../server/persistence/TestPersistentBase.java | 2 +-
.../amoro/server/table/TestCatalogService.java | 6 +-
.../amoro/server/table/TestDatabaseService.java | 6 +-
.../server/table/TestTableRuntimeManager.java | 2 +-
.../amoro/server/table/TestTableService.java | 6 +-
.../amoro}/exception/AlreadyExistsException.java | 2 +-
.../amoro}/exception/AmoroRuntimeException.java | 4 +-
.../amoro}/exception/BlockerConflictException.java | 2 +-
.../amoro}/exception/ForbiddenException.java | 2 +-
.../amoro}/exception/IllegalMetadataException.java | 2 +-
.../exception/IllegalTaskStateException.java | 14 +-
.../amoro}/exception/LoadingPluginException.java | 2 +-
.../amoro}/exception/ObjectNotExistsException.java | 2 +-
.../exception/OptimizingClosedException.java | 2 +-
.../exception/OptimizingCommitException.java | 2 +-
.../amoro}/exception/PersistenceException.java | 2 +-
.../amoro}/exception/PluginAuthException.java | 2 +-
.../amoro}/exception/PluginRetryAuthException.java | 2 +-
.../amoro}/exception/SignatureCheckException.java | 2 +-
.../amoro}/exception/TaskNotFoundException.java | 2 +-
.../amoro}/exception/TaskRuntimeException.java | 2 +-
.../amoro}/exception/UndefinedException.java | 2 +-
.../org/apache/amoro/process/SimpleFuture.java | 100 +++++++--
.../org/apache/amoro/process/TableProcess.java | 2 +-
.../org/apache/amoro/process/TestSimpleFuture.java | 238 +++++++++++++++++++++
45 files changed, 397 insertions(+), 91 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 467f45aac..3d70c0acb 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -26,12 +26,12 @@ import org.apache.amoro.api.AmoroTableMetastore;
import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
-import org.apache.amoro.server.exception.AmoroRuntimeException;
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 7069b5b49..5a9d0c052 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -29,14 +29,14 @@ import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.exception.ForbiddenException;
+import org.apache.amoro.exception.IllegalTaskStateException;
+import org.apache.amoro.exception.ObjectNotExistsException;
+import org.apache.amoro.exception.PluginRetryAuthException;
+import org.apache.amoro.exception.TaskNotFoundException;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
-import org.apache.amoro.server.exception.ForbiddenException;
-import org.apache.amoro.server.exception.IllegalTaskStateException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
-import org.apache.amoro.server.exception.PluginRetryAuthException;
-import org.apache.amoro.server.exception.TaskNotFoundException;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
index 5c37d6ca8..a58bfb679 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
@@ -37,10 +37,10 @@ import io.javalin.plugin.json.JavalinJackson;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.events.IcebergReportEvent;
+import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.table.TableService;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
index 967f94f66..2c0a20181 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
@@ -23,9 +23,9 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableIDWithFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.TableIdentifier;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.IllegalMetadataException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.IllegalMetadataException;
+import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java
index b812c61da..70e52a60b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java
@@ -27,12 +27,12 @@ import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.formats.iceberg.IcebergTable;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.mixed.InternalMixedIcebergCatalog;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.RestCatalogService;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.internal.InternalIcebergCreator;
import org.apache.amoro.server.table.internal.InternalIcebergHandler;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
index 48fe6b951..dfeede9c2 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.catalog;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableIDWithFormat;
import org.apache.amoro.api.CatalogMeta;
-import org.apache.amoro.server.exception.IllegalMetadataException;
+import org.apache.amoro.exception.IllegalMetadataException;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
index f858e8e81..364c9728d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
@@ -32,6 +32,8 @@ import io.javalin.http.HttpCode;
import io.javalin.http.staticfiles.Location;
import io.javalin.http.staticfiles.StaticFileConfig;
import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.ForbiddenException;
+import org.apache.amoro.exception.SignatureCheckException;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.DefaultOptimizingService;
import org.apache.amoro.server.RestCatalogService;
@@ -48,8 +50,6 @@ import
org.apache.amoro.server.dashboard.controller.TerminalController;
import org.apache.amoro.server.dashboard.controller.VersionController;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator;
-import org.apache.amoro.server.exception.ForbiddenException;
-import org.apache.amoro.server.exception.SignatureCheckException;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java
index 35162382f..378f60087 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java
@@ -19,7 +19,7 @@
package org.apache.amoro.server.dashboard.utils;
import io.javalin.http.Context;
-import org.apache.amoro.server.exception.SignatureCheckException;
+import org.apache.amoro.exception.SignatureCheckException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.telnet.TelnetClient;
import org.slf4j.Logger;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java
index 665d74d7a..05820b720 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java
@@ -19,9 +19,9 @@
package org.apache.amoro.server.manager;
import org.apache.amoro.ActivePlugin;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.LoadingPluginException;
import org.apache.amoro.server.Environments;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.LoadingPluginException;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java
index 79e8cb521..2312072bc 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java
@@ -24,13 +24,13 @@ import static
org.apache.amoro.hive.op.UpdateHiveFiles.SYNC_DATA_TO_HIVE;
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.PrimaryKeyedFile;
+import org.apache.amoro.exception.OptimizingCommitException;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.op.OverwriteBaseFiles;
import org.apache.amoro.op.SnapshotSummary;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.server.AmoroServiceConstants;
-import org.apache.amoro.server.exception.OptimizingCommitException;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.MixedTableUtil;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index fb844a41a..ccbe9ccd5 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -22,10 +22,10 @@ import org.apache.amoro.AmoroTable;
import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.OptimizingTaskId;
+import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
-import org.apache.amoro.server.exception.OptimizingClosedException;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.plan.OptimizingPlanner;
import org.apache.amoro.server.persistence.PersistentBase;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
index eb3ecca22..0b5309ee9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
@@ -22,10 +22,10 @@ import org.apache.amoro.StateField;
import org.apache.amoro.api.OptimizingTask;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.exception.IllegalTaskStateException;
+import org.apache.amoro.exception.OptimizingClosedException;
+import org.apache.amoro.exception.TaskRuntimeException;
import org.apache.amoro.server.AmoroServiceConstants;
-import org.apache.amoro.server.exception.IllegalTaskStateException;
-import org.apache.amoro.server.exception.OptimizingClosedException;
-import org.apache.amoro.server.exception.TaskRuntimeException;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.resource.OptimizerThread;
@@ -281,7 +281,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?,
?, ?>> extends Stated
throw new OptimizingClosedException(taskId.getProcessId());
}
if (!getNext().contains(targetStatus)) {
- throw new IllegalTaskStateException(taskId, status, targetStatus);
+ throw new IllegalTaskStateException(taskId, status.name(),
targetStatus.name());
}
status = targetStatus;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
index 10b3802f5..54adce64f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
@@ -23,6 +23,7 @@ import static
org.apache.amoro.hive.op.UpdateHiveFiles.SYNC_DATA_TO_HIVE;
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.data.FileNameRules;
+import org.apache.amoro.exception.OptimizingCommitException;
import org.apache.amoro.hive.HMSClientPool;
import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.hive.utils.HivePartitionUtil;
@@ -33,7 +34,6 @@ import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.AmoroServiceConstants;
-import org.apache.amoro.server.exception.OptimizingCommitException;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.UnkeyedTable;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
index 4a2d6fb26..e3f822c38 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
@@ -18,8 +18,8 @@
package org.apache.amoro.server.persistence;
-import org.apache.amoro.server.exception.AmoroRuntimeException;
-import org.apache.amoro.server.exception.PersistenceException;
+import org.apache.amoro.exception.AmoroRuntimeException;
+import org.apache.amoro.exception.PersistenceException;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.ibatis.session.TransactionIsolationLevel;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index b868cb38c..c5c613228 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -29,16 +29,16 @@ import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.BlockerConflictException;
+import org.apache.amoro.exception.IllegalMetadataException;
+import org.apache.amoro.exception.ObjectNotExistsException;
+import org.apache.amoro.exception.PersistenceException;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.catalog.CatalogBuilder;
import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.BlockerConflictException;
-import org.apache.amoro.server.exception.IllegalMetadataException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
-import org.apache.amoro.server.exception.PersistenceException;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.StatedPersistentBase;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java
index 9acebf80f..7db3b41ba 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java
@@ -18,8 +18,8 @@
package org.apache.amoro.server.utils;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.ObjectNotExistsException;
public class PreconditionUtils {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 7945dd8ee..99b950a2b 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -28,11 +28,11 @@ import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.exception.IllegalTaskStateException;
+import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
-import org.apache.amoro.server.exception.IllegalTaskStateException;
-import org.apache.amoro.server.exception.PluginRetryAuthException;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java
index 79520890e..19a4002e8 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java
@@ -20,8 +20,8 @@ package org.apache.amoro.server.manager;
import com.clearspring.analytics.util.Lists;
import org.apache.amoro.ActivePlugin;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.LoadingPluginException;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.LoadingPluginException;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java
index 81ec3ca13..797503c83 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java
@@ -25,12 +25,12 @@ import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.DefaultKeyedFile;
+import org.apache.amoro.exception.OptimizingCommitException;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.scan.CombinedScanTask;
import org.apache.amoro.scan.KeyedTableScanTask;
import org.apache.amoro.scan.MixedFileScanTask;
-import org.apache.amoro.server.exception.OptimizingCommitException;
import org.apache.amoro.server.optimizing.KeyedTableCommit;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.TaskRuntime;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java
index 61da4399c..d479b05fd 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java
@@ -24,9 +24,9 @@ import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.catalog.TableTestBase;
+import org.apache.amoro.exception.OptimizingCommitException;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
-import org.apache.amoro.server.exception.OptimizingCommitException;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.optimizing.UnKeyedTableCommit;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java
index b28e6f642..9af985879 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java
@@ -20,7 +20,7 @@ package org.apache.amoro.server.persistence;
import static org.mockito.Mockito.never;
-import org.apache.amoro.server.exception.UndefinedException;
+import org.apache.amoro.exception.UndefinedException;
import org.apache.ibatis.session.SqlSession;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
index 12ce5d953..fd6a5def6 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
@@ -22,13 +22,13 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TestedCatalogs;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.IllegalMetadataException;
+import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.hive.TestHMS;
import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.catalog.InternalCatalog;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.IllegalMetadataException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.ClassRule;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
index 759f388fb..885db19b4 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
@@ -26,10 +26,10 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.TestedCatalogs;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.IllegalMetadataException;
+import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.server.catalog.InternalCatalog;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.IllegalMetadataException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Assume;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java
index b6b8ce1e7..f77996d36 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java
@@ -24,9 +24,9 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
import org.apache.amoro.hive.catalog.HiveTableTestHelper;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.table.MixedTable;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
index cbf8010c6..a1e7a59a0 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
@@ -31,14 +31,14 @@ import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.api.Blocker;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.BlockerConflictException;
+import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
import org.apache.amoro.hive.catalog.HiveTableTestHelper;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
-import org.apache.amoro.server.exception.AlreadyExistsException;
-import org.apache.amoro.server.exception.BlockerConflictException;
-import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.table.blocker.RenewableBlocker;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AlreadyExistsException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/AlreadyExistsException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/AlreadyExistsException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/AlreadyExistsException.java
index 3a32c168b..625363463 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AlreadyExistsException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/AlreadyExistsException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
import org.apache.amoro.api.TableIdentifier;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AmoroRuntimeException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
similarity index 98%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/AmoroRuntimeException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
index a013b55e2..555c238c1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AmoroRuntimeException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
import org.apache.amoro.ErrorCodes;
import org.apache.amoro.ServerTableIdentifier;
@@ -142,7 +142,7 @@ public class AmoroRuntimeException extends RuntimeException
{
}
}
- private static AmoroRuntimeException wrap(Throwable throwable) {
+ public static AmoroRuntimeException wrap(Throwable throwable) {
return wrap(throwable, UndefinedException::new);
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/BlockerConflictException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/BlockerConflictException.java
similarity index 95%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/BlockerConflictException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/BlockerConflictException.java
index bc12499a9..34f5803ee 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/BlockerConflictException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/BlockerConflictException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class BlockerConflictException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ForbiddenException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/ForbiddenException.java
similarity index 95%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/ForbiddenException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/ForbiddenException.java
index 420a90939..5f3538ee7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ForbiddenException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/ForbiddenException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
/** forbiddenException */
public class ForbiddenException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalMetadataException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalMetadataException.java
similarity index 95%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalMetadataException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/IllegalMetadataException.java
index 040bd0211..37ef01ec3 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalMetadataException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalMetadataException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class IllegalMetadataException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalTaskStateException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalTaskStateException.java
similarity index 75%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalTaskStateException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/IllegalTaskStateException.java
index 26df696fa..55ba28854 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalTaskStateException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalTaskStateException.java
@@ -16,19 +16,17 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
import org.apache.amoro.api.OptimizingTaskId;
-import org.apache.amoro.server.optimizing.TaskRuntime;
public class IllegalTaskStateException extends AmoroRuntimeException {
- private final TaskRuntime.Status preStatus;
- private final TaskRuntime.Status targetStatus;
+ private final String preStatus;
+ private final String targetStatus;
private final OptimizingTaskId taskId;
- public IllegalTaskStateException(
- OptimizingTaskId taskId, TaskRuntime.Status preStatus,
TaskRuntime.Status targetStatus) {
+ public IllegalTaskStateException(OptimizingTaskId taskId, String preStatus,
String targetStatus) {
super(
String.format("Illegal Task of %s status from %s to %s", taskId,
preStatus, targetStatus));
this.taskId = taskId;
@@ -36,11 +34,11 @@ public class IllegalTaskStateException extends
AmoroRuntimeException {
this.targetStatus = targetStatus;
}
- public TaskRuntime.Status getPreStatus() {
+ public String getPreStatus() {
return preStatus;
}
- public TaskRuntime.Status getTargetStatus() {
+ public String getTargetStatus() {
return targetStatus;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/LoadingPluginException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/LoadingPluginException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/LoadingPluginException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/LoadingPluginException.java
index 4940ff3b1..a60905b8a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/LoadingPluginException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/LoadingPluginException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class LoadingPluginException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ObjectNotExistsException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/ObjectNotExistsException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/ObjectNotExistsException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/ObjectNotExistsException.java
index b9e38863b..925c7f354 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ObjectNotExistsException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/ObjectNotExistsException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.TableIdentifier;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingClosedException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingClosedException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingClosedException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/OptimizingClosedException.java
index 2940e1b9c..a6fc8cd52 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingClosedException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingClosedException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class OptimizingClosedException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingCommitException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingCommitException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingCommitException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/OptimizingCommitException.java
index 9dc13bc35..a48414ac1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingCommitException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingCommitException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class OptimizingCommitException extends Exception {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PersistenceException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/PersistenceException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/PersistenceException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/PersistenceException.java
index bdc0c6dfb..b4be24d72 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PersistenceException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/PersistenceException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class PersistenceException extends AmoroRuntimeException {
public PersistenceException(String message, Throwable cause) {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginAuthException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/PluginAuthException.java
similarity index 95%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginAuthException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/PluginAuthException.java
index e772044ac..f8d5de940 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginAuthException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/PluginAuthException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class PluginAuthException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginRetryAuthException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/PluginRetryAuthException.java
similarity index 95%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginRetryAuthException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/PluginRetryAuthException.java
index 2730fe6dd..67de2e736 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginRetryAuthException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/PluginRetryAuthException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class PluginRetryAuthException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/SignatureCheckException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/SignatureCheckException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/SignatureCheckException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/SignatureCheckException.java
index 3dd53034e..357641018 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/SignatureCheckException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/SignatureCheckException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
/** SignatureCheckException */
public class SignatureCheckException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskNotFoundException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/TaskNotFoundException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskNotFoundException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/TaskNotFoundException.java
index c3ea2598b..87b37c5b7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskNotFoundException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/TaskNotFoundException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
import org.apache.amoro.api.OptimizingTaskId;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskRuntimeException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/TaskRuntimeException.java
similarity index 96%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskRuntimeException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/TaskRuntimeException.java
index 6893881ab..282bc5788 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskRuntimeException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/TaskRuntimeException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class TaskRuntimeException extends AmoroRuntimeException {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/UndefinedException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/UndefinedException.java
similarity index 95%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/exception/UndefinedException.java
rename to
amoro-common/src/main/java/org/apache/amoro/exception/UndefinedException.java
index ca722be35..0fb0d9187 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/exception/UndefinedException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/UndefinedException.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.exception;
+package org.apache.amoro.exception;
public class UndefinedException extends AmoroRuntimeException {
public UndefinedException(Throwable throwable) {
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java
b/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java
index 3a02ea097..ce1332bfd 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java
@@ -18,42 +18,112 @@
package org.apache.amoro.process;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.exception.AmoroRuntimeException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
/** A simple wrapper of CompletableFuture for better code readability. */
public class SimpleFuture {
- protected CompletableFuture<Object> future;
+ private final CompletableFuture<?> completedFuture = new
CompletableFuture<>();
+ private final Map<Runnable, CompletableFuture<?>> callbackMap = new
LinkedHashMap<>();
+ private CompletableFuture<?> triggerFuture;
+ private CompletableFuture<?> callbackFuture;
public SimpleFuture() {
- future = new CompletableFuture<>();
+ this(new CompletableFuture<>());
}
- private SimpleFuture(CompletableFuture<Object> future) {
- this.future = future;
+ protected SimpleFuture(CompletableFuture<?> triggerFuture) {
+ this.triggerFuture = triggerFuture;
+ this.callbackFuture = triggerFuture;
+ whenCompleted(() -> {});
}
+ /**
+ * Only proceed the callback if there is no exception. Exceptions would be
thrown in the
+ * complete() method and will not trigger join return.
+ */
public void whenCompleted(Runnable runnable) {
- future =
- future.whenComplete(
+ callbackFuture =
+ callbackFuture.whenComplete(
(v, e) -> {
- Preconditions.checkState(e == null);
- runnable.run();
+ if (e == null) {
+ runnable.run();
+ if (callbackMap.get(runnable) == callbackFuture) {
+ completedFuture.complete(null);
+ }
+ } else {
+ throw AmoroRuntimeException.wrap(e);
+ }
});
+ callbackMap.put(runnable, callbackFuture);
}
- public SimpleFuture complete() {
- future.complete(null);
- return this;
+ public boolean isDone() {
+ return completedFuture.isDone();
}
+ public void reset() {
+ triggerFuture = new CompletableFuture<>();
+ callbackFuture = triggerFuture;
+ whenCompleted(() -> {});
+ callbackMap.keySet().forEach(this::whenCompleted);
+ }
+
+ /**
+ * This method will trigger all callback functions in the same thread and
wait until all callback
+ * functions are completed. If throws exception, completedFuture is not done.
+ */
+ public void complete() {
+ try {
+ if (triggerFuture.complete(null)) {
+ callbackFuture.get();
+ }
+ } catch (Throwable throwable) {
+ throw normalize(throwable);
+ }
+ }
+
+ /** Return until completedFuture is done, which means task or process is
truly finished */
public void join() {
- future.join();
+ try {
+ completedFuture.join();
+ } catch (Throwable throwable) {
+ throw normalize(throwable);
+ }
+ }
+
+ private AmoroRuntimeException normalize(Throwable throwable) {
+ if (throwable instanceof ExecutionException && throwable.getCause() !=
null) {
+ return AmoroRuntimeException.wrap(throwable.getCause());
+ }
+ return AmoroRuntimeException.wrap(throwable);
+ }
+
+ public SimpleFuture or(SimpleFuture anotherFuture) {
+ return new SimpleFuture(
+ CompletableFuture.anyOf(completedFuture,
anotherFuture.completedFuture));
+ }
+
+ public SimpleFuture and(SimpleFuture anotherFuture) {
+ return new SimpleFuture(
+ CompletableFuture.allOf(completedFuture,
anotherFuture.completedFuture));
+ }
+
+ public static SimpleFuture allOf(List<SimpleFuture> futures) {
+ return new SimpleFuture(
+ CompletableFuture.allOf(
+ futures.stream().map(f ->
f.completedFuture).toArray(CompletableFuture[]::new)));
}
- public SimpleFuture anyOf(SimpleFuture another) {
- return new SimpleFuture(CompletableFuture.anyOf(future, another.future));
+ public static SimpleFuture anyOf(List<SimpleFuture> futures) {
+ return new SimpleFuture(
+ CompletableFuture.anyOf(
+ futures.stream().map(f ->
f.completedFuture).toArray(CompletableFuture[]::new)));
}
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
index 2df63b7fc..f9c32e331 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
@@ -87,7 +87,7 @@ public abstract class TableProcess<T extends
TableProcessState> implements Amoro
@Override
public SimpleFuture getSubmitFuture() {
- return submitFuture.anyOf(completeFuture);
+ return submitFuture.or(completeFuture);
}
@Override
diff --git
a/amoro-common/src/test/java/org/apache/amoro/process/TestSimpleFuture.java
b/amoro-common/src/test/java/org/apache/amoro/process/TestSimpleFuture.java
new file mode 100644
index 000000000..7f5c354e3
--- /dev/null
+++ b/amoro-common/src/test/java/org/apache/amoro/process/TestSimpleFuture.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import org.apache.amoro.exception.AmoroRuntimeException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class TestSimpleFuture {
+
+ private SimpleFuture simpleFuture;
+ private int[] callbackNum;
+ private boolean[] calledFlag;
+
+ private void resetCallbackData() {
+ callbackNum = new int[] {-1, -1, -1, -1, -1};
+ calledFlag = new boolean[] {false, false, false, false, false};
+ }
+
+ @Before
+ public void setUp() {
+ resetCallbackData();
+ simpleFuture = new SimpleFuture();
+ long threadId = Thread.currentThread().getId();
+ for (int i = 0; i < callbackNum.length; i++) {
+ final int num = i;
+ simpleFuture.whenCompleted(
+ () -> {
+ for (int j = num; j < calledFlag.length; j++) {
+ Assert.assertFalse(
+ "callback " + j + " should not be called before " + num,
calledFlag[j]);
+ }
+ // Trigger error if callbackNum[num] == 0
+ if (callbackNum[num] == 0) {
+ throw new RuntimeException("Callback error");
+ }
+ callbackNum[num] = num;
+ Assert.assertEquals(
+ "Callback should be run in the same thread",
+ threadId,
+ Thread.currentThread().getId());
+ });
+ }
+ }
+
+ @Test
+ public void testComplete() {
+ simpleFuture.complete();
+
+ for (int i = 0; i < 5; i++) {
+ Assert.assertEquals("Current callback num: " + i, i, callbackNum[i]);
+ }
+ Assert.assertTrue("SimpleFuture should complete if callback no error",
simpleFuture.isDone());
+ }
+
+ @Test
+ public void testCompleteTwiceButNotReset() {
+ simpleFuture.complete();
+ // The second call will not trigger any callback
+ for (int i = 0; i < 5; i++) {
+ callbackNum[i] = -1;
+ }
+ simpleFuture.complete();
+ for (int i = 0; i < 5; i++) {
+ Assert.assertEquals("Current callback num: " + i, -1, callbackNum[i]);
+ }
+ Assert.assertTrue("SimpleFuture should not complete if callback error",
simpleFuture.isDone());
+ }
+
+ // Additional tests for edge cases and error conditions
+ @Test
+ public void testCallbackError() {
+ callbackNum[2] = 0; // Trigger error in callbackNum[2]
+
+ try {
+ simpleFuture.complete();
+ } catch (Throwable throwable) {
+ Assert.assertTrue("Should catch the error", throwable instanceof
AmoroRuntimeException);
+ Assert.assertTrue("Should catch the error", throwable.getCause()
instanceof RuntimeException);
+ Assert.assertEquals(
+ "Should catch the error", "Callback error",
throwable.getCause().getMessage());
+ }
+ for (int i = 0; i < 5; i++) {
+ if (i < 2) {
+ Assert.assertEquals("Current callback num: " + i, i, callbackNum[i]);
+ } else if (i == 2) {
+ Assert.assertEquals("Current callback num: " + i, 0, callbackNum[i]);
+ } else {
+ Assert.assertEquals("Current callback num: " + i, -1, callbackNum[i]);
+ }
+ }
+ Assert.assertFalse("SimpleFuture should not complete if callback error",
simpleFuture.isDone());
+ }
+
+ @Test
+ public void testReset() {
+ callbackNum[2] = 0; // Trigger error in callbackNum[2]
+ try {
+ simpleFuture.complete();
+ } catch (Throwable throwable) {
+ Assert.assertTrue("Should catch the error", throwable instanceof
AmoroRuntimeException);
+ }
+ Assert.assertFalse("SimpleFuture should not complete if callback error",
simpleFuture.isDone());
+
+ resetCallbackData(); // Trigger normal callback
+ simpleFuture.reset();
+ simpleFuture.complete();
+ for (int i = 0; i < 5; i++) {
+ Assert.assertEquals("Current callback num: " + i, i, callbackNum[i]);
+ }
+ Assert.assertTrue("SimpleFuture should not complete if callback error",
simpleFuture.isDone());
+ }
+
+ @Test
+ public void testIsDone() {
+ simpleFuture.complete();
+ Assert.assertTrue("Future should be completed", simpleFuture.isDone());
+ }
+
+ @Test(expected = AmoroRuntimeException.class)
+ public void testCompleteException() throws ExecutionException,
InterruptedException {
+ CompletableFuture<?> future = mock(CompletableFuture.class);
+ doReturn(true).when(future).complete(null);
+ doThrow(new RuntimeException()).when(future).get();
+ SimpleFuture simpleFuture = new SimpleFuture(future);
+
+ simpleFuture.complete();
+ }
+
+ @Test
+ public void testJoin() {
+ simpleFuture.complete();
+ simpleFuture.join();
+ Assert.assertTrue("Future should be completed", simpleFuture.isDone());
+ }
+
+ @Test
+ public void testJoinException() throws ExecutionException,
InterruptedException {
+ CompletableFuture<?> future = mock(CompletableFuture.class);
+ doThrow(new RuntimeException()).when(future).get();
+ SimpleFuture simpleFuture = new SimpleFuture(future);
+ simpleFuture.reset();
+ simpleFuture.complete();
+ simpleFuture.join();
+ Assert.assertTrue("Future should be completed", simpleFuture.isDone());
+ }
+
+ @Test
+ public void testOr() {
+ SimpleFuture anotherFuture = new SimpleFuture();
+ SimpleFuture combinedFuture = simpleFuture.or(anotherFuture);
+
+ simpleFuture.complete();
+ Assert.assertTrue(
+ "Combined future should be completed when either future completes",
+ combinedFuture.isDone());
+ }
+
+ @Test
+ public void testAnd() {
+ SimpleFuture anotherFuture = new SimpleFuture();
+ SimpleFuture combinedFuture = simpleFuture.and(anotherFuture);
+
+ simpleFuture.complete();
+ anotherFuture.complete();
+ Assert.assertTrue(
+ "Combined future should be completed when both futures complete",
combinedFuture.isDone());
+ }
+
+ @Test
+ public void testAllOf() {
+ List<SimpleFuture> futures =
+ Arrays.asList(new SimpleFuture(), new SimpleFuture(), new
SimpleFuture());
+ SimpleFuture combinedFuture = SimpleFuture.allOf(futures);
+
+ futures.forEach(SimpleFuture::complete);
+ Assert.assertTrue(
+ "Combined future should be completed when all futures complete",
combinedFuture.isDone());
+ }
+
+ @Test
+ public void testAnyOf() {
+ List<SimpleFuture> futures = Arrays.asList(new SimpleFuture(), new
SimpleFuture());
+ SimpleFuture combinedFuture = SimpleFuture.anyOf(futures);
+
+ futures.get(0).complete();
+ Assert.assertTrue(
+ "Combined future should be completed when any future completes",
combinedFuture.isDone());
+ }
+
+ // Test for when the future is already completed before calling complete()
+ @Test
+ public void testCompleteAlreadyCompleted() {
+ simpleFuture.complete();
+ try {
+ simpleFuture.complete();
+ } catch (Throwable throwable) {
+ Assert.fail(throwable.getMessage());
+ }
+ }
+
+ // Test for when the future is already completed before calling join()
+ @Test
+ public void testJoinAlreadyCompleted() {
+ simpleFuture.complete();
+ try {
+ simpleFuture.join();
+ } catch (Throwable throwable) {
+ Assert.fail(throwable.getMessage());
+ }
+ }
+}