This is an automated email from the ASF dual-hosted git repository.
nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 743e6f1cd [Improvement]: Refactor process API and table runtime
extension interfaces (#4097)
743e6f1cd is described below
commit 743e6f1cde34ec5b0dcc6b93503557c2daed2b12
Author: baiyangtx <[email protected]>
AuthorDate: Wed Feb 25 20:02:28 2026 +0800
[Improvement]: Refactor process API and table runtime extension interfaces
(#4097)
* improvement: refactor process API and table runtime extension interfaces
# Context
Split from upstream PR apache/amoro#4081 to make review easier.
This MR introduces the new process extension APIs in common module, without
touching most AMS internals yet.
# Changes
- Promote `ProcessFactory` to a richer abstraction in `amoro-common`,
preparing for plugin-based table process extension.
- Adjust `Action`/`IcebergActions` and `TableRuntime` to align with the new
process model.
- Move `ActionCoordinator` into `amoro-common` so it can be shared as a
public abstraction.
- Introduce `ProcessTriggerStrategy` to describe trigger policies for
processes.
- Clean up legacy process state classes which will be replaced by the new
abstractions in follow-up MRs.
# Notes
- This commit only updates the common module and the shared
`ActionCoordinator` API; AMS-side wiring and runtime refactors will be done in
separate branches/MRs as discussed.
Co-Authored-By: Aime <[email protected]>
Change-Id: If84ada8fcae1cfb11577d56d3866db7ce0949102
* fix compile error
* fix compile error
* fix compile error3
* getTableConfig
* refactor
---------
Co-authored-by: zhangyongxiang.alpha <[email protected]>
Co-authored-by: Aime <[email protected]>
---
.../persistence/mapper/ProcessStateMapper.java | 87 -------
.../server/process/ActionCoordinatorScheduler.java | 17 +-
.../amoro/server/process/ProcessService.java | 3 +-
.../amoro/server/process/TableProcessMeta.java | 20 --
.../scheduler/PeriodicExternalScheduler.java | 278 ---------------------
.../amoro/server/table/AbstractTableRuntime.java | 67 ++++-
.../amoro/server/table/DefaultTableRuntime.java | 74 +-----
.../server/table/DefaultTableRuntimeFactory.java | 11 +
.../server/table/SupportProcessManagement.java | 32 +--
.../server/process/MockActionCoordinator.java | 32 +--
.../src/main/java/org/apache/amoro/Action.java | 41 ++-
.../main/java/org/apache/amoro/IcebergActions.java | 10 +-
.../main/java/org/apache/amoro/TableRuntime.java | 11 +-
.../apache/amoro}/process/ActionCoordinator.java | 37 +--
.../org/apache/amoro/process/AmoroProcess.java | 2 +-
.../org/apache/amoro/process/OptimizingState.java | 80 ------
.../org/apache/amoro/process/ProcessFactory.java | 33 ++-
.../org/apache/amoro/process/ProcessState.java | 77 ------
.../amoro/process/ProcessTriggerStrategy.java | 57 +++++
.../RecoverProcessFailedException.java} | 24 +-
.../apache/amoro/process/TableProcessState.java | 224 -----------------
.../apache/amoro/table/TableRuntimeFactory.java | 6 +
22 files changed, 240 insertions(+), 983 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
deleted file mode 100644
index f7b755781..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.persistence.mapper;
-
-import org.apache.amoro.process.TableProcessState;
-import org.apache.ibatis.annotations.Insert;
-import org.apache.ibatis.annotations.Options;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Result;
-import org.apache.ibatis.annotations.ResultMap;
-import org.apache.ibatis.annotations.Results;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
-
-import java.util.Map;
-
-public interface ProcessStateMapper {
-
- @Insert(
- "INSERT INTO table_process_state "
- + "(process_id, action, table_id, retry_num, status, start_time,
end_time, fail_reason, summary) "
- + "VALUES "
- + "(#{id}, #{action}, #{tableId}, #{retryNumber}, #{status},
#{startTime}, #{endTime}, #{failedReason}, #{summary})")
- @Options(useGeneratedKeys = true, keyProperty = "id")
- void createProcessState(TableProcessState state);
-
- @Update(
- "UPDATE table_process_state "
- + "SET status = #{status}, start_time = #{startTime} "
- + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
- void updateProcessRunning(TableProcessState state);
-
- @Update(
- "UPDATE table_process_state "
- + "SET status = #{status}, end_time = #{endTime} "
- + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
- void updateProcessCompleted(TableProcessState state);
-
- @Update(
- "UPDATE table_process_state "
- + "SET status = #{status}, end_time = #{endTime}, fail_reason =
#{failedReason} "
- + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
- void updateProcessFailed(TableProcessState state);
-
- @Select(
- "SELECT process_id, action, table_id, retry_num, status, start_time,
end_time, fail_reason, summary "
- + "FROM table_process_state "
- + "WHERE process_id = #{processId}")
- @Results(
- id = "TableProcessStateResultMap",
- value = {
- @Result(property = "id", column = "process_id"),
- @Result(property = "action", column = "action"),
- @Result(property = "tableId", column = "table_id"),
- @Result(property = "retryNumber", column = "retry_num"),
- @Result(property = "status", column = "status"),
- @Result(property = "startTime", column = "start_time"),
- @Result(property = "endTime", column = "end_time"),
- @Result(property = "failedReason", column = "fail_reason"),
- @Result(property = "summary", column = "summary", javaType = Map.class)
- })
- TableProcessState getProcessStateById(@Param("processId") long processId);
-
- /** Query TableProcessState by table_id */
- @Select(
- "SELECT process_id, action, table_id, retry_num, status, start_time,
end_time, fail_reason, summary "
- + "FROM table_process_state "
- + "WHERE table_id = #{tableId}")
- @ResultMap("TableProcessStateResultMap")
- TableProcessState getProcessStateByTableId(@Param("tableId") long tableId);
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
index 83574c663..daf586941 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server.process;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
@@ -27,6 +28,8 @@ import org.apache.amoro.server.table.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
+
/**
* Periodic scheduler that delegates scheduling decisions to an {@link
ActionCoordinator}. It
* creates, recovers and retries table processes via {@link ProcessService}.
@@ -95,8 +98,8 @@ public class ActionCoordinatorScheduler extends
PeriodicTableScheduler {
*/
@Override
protected void execute(TableRuntime tableRuntime) {
- TableProcess process = coordinator.createTableProcess(tableRuntime);
- processService.register(tableRuntime, process);
+ Optional<TableProcess> process = coordinator.trigger(tableRuntime);
+ process.ifPresent(p -> processService.register(tableRuntime, p));
}
/**
@@ -110,16 +113,6 @@ public class ActionCoordinatorScheduler extends
PeriodicTableScheduler {
processService.recover(tableRuntime, process);
}
- /**
- * Retry a failed table process.
- *
- * @param process process to retry
- */
- protected void retry(TableProcess process) {
- process = coordinator.retryTableProcess(process);
- processService.retry(process);
- }
-
/**
* Get executor delay from coordinator.
*
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index ee9f136dc..252d1574b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -25,6 +25,7 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ProcessEvent;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.TableProcess;
@@ -242,7 +243,7 @@ public class ProcessService extends PersistentBase {
"Regular Retry.",
process.getProcessParameters(),
process.getSummary());
- scheduler.retry(process);
+ executeOrTraceProcess(process);
} else {
untrackTableProcessInstance(
process.getTableRuntime().getTableIdentifier(),
process.getId());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
index 311b60e03..6890c25c1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
@@ -19,7 +19,6 @@
package org.apache.amoro.server.process;
import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcessState;
import org.apache.amoro.process.TableProcessStore;
import java.util.HashMap;
@@ -189,25 +188,6 @@ public class TableProcessMeta {
return tableProcessMeta;
}
- @Deprecated
- public static TableProcessMeta fromTableProcessState(TableProcessState
tableProcessState) {
- TableProcessMeta tableProcessMeta = new TableProcessMeta();
- tableProcessMeta.setProcessId(tableProcessState.getId());
-
tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId());
-
tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier());
- tableProcessMeta.setStatus(tableProcessState.getStatus());
- tableProcessMeta.setProcessType(tableProcessState.getAction().getName());
- tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc());
-
tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine());
- tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber());
- tableProcessMeta.setCreateTime(tableProcessState.getStartTime());
- tableProcessMeta.setFinishTime(tableProcessState.getEndTime());
- tableProcessMeta.setFailMessage(tableProcessState.getFailedReason());
-
tableProcessMeta.setProcessParameters(tableProcessState.getProcessParameters());
- tableProcessMeta.setSummary(tableProcessState.getSummary());
- return tableProcessMeta;
- }
-
public static TableProcessMeta of(
long processId,
long tableId,
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
deleted file mode 100644
index 346f02772..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.scheduler;
-
-import org.apache.amoro.Action;
-import org.apache.amoro.SupportsProcessPlugins;
-import org.apache.amoro.TableRuntime;
-import org.apache.amoro.process.AmoroProcess;
-import org.apache.amoro.process.ManagedProcess;
-import org.apache.amoro.process.ProcessFactory;
-import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.SimpleFuture;
-import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessState;
-import org.apache.amoro.process.TableProcessStore;
-import org.apache.amoro.resource.ExternalResourceContainer;
-import org.apache.amoro.resource.Resource;
-import org.apache.amoro.resource.ResourceManager;
-import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
-import org.apache.amoro.server.process.DefaultTableProcessStore;
-import org.apache.amoro.server.process.TableProcessMeta;
-import org.apache.amoro.server.table.TableService;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
-
-import java.util.List;
-import java.util.Optional;
-
-public abstract class PeriodicExternalScheduler extends PeriodicTableScheduler
{
-
- private final ExternalResourceContainer resourceContainer;
- private final ResourceManager resourceManager;
- private final ProcessFactory processFactory;
-
- public PeriodicExternalScheduler(
- ResourceManager resourceManager,
- ExternalResourceContainer resourceContainer,
- Action action,
- TableService tableService,
- int poolSize) {
- super(action, tableService, poolSize);
- this.resourceContainer = resourceContainer;
- this.resourceManager = resourceManager;
- this.processFactory = generateProcessFactory();
- }
-
- @Override
- protected void initHandler(List<TableRuntime> tableRuntimeList) {
- tableRuntimeList.stream()
- .filter(t -> t instanceof SupportsProcessPlugins)
- .map(t -> (SupportsProcessPlugins) t)
- .forEach(tableRuntime -> tableRuntime.install(getAction(),
processFactory));
- super.initHandler(tableRuntimeList);
- }
-
- @Override
- protected boolean enabled(TableRuntime tableRuntime) {
- return Optional.of(tableRuntime)
- .filter(t -> t instanceof SupportsProcessPlugins)
- .map(t -> (SupportsProcessPlugins) t)
- .map(t -> t.enabled(getAction()))
- .orElse(false);
- }
-
- @Override
- protected void execute(TableRuntime tableRuntime) {
- Preconditions.checkArgument(tableRuntime instanceof
SupportsProcessPlugins);
- SupportsProcessPlugins runtimeSupportProcessPlugin =
(SupportsProcessPlugins) tableRuntime;
- // Trigger a table process and check conflicts by table runtime
- // Update process state after process completed, the callback must be
register first
- AmoroProcess process = runtimeSupportProcessPlugin.trigger(getAction());
- process.getCompleteFuture().whenCompleted(() ->
persistTableProcess(process));
- ManagedProcess managedProcess = new ManagedTableProcess(process);
-
- // Submit the table process to resource manager, this is a sync operation
- // update process completed and delete related resources
- managedProcess.submit();
-
- // Trace the table process by async framework so that process can be
called back when completed
- trace(process);
- }
-
- protected int getMaxRetryNumber() {
- return 1;
- }
-
- protected abstract void trace(AmoroProcess process);
-
- protected ProcessFactory generateProcessFactory() {
- return new ExternalProcessFactory();
- }
-
- protected void persistTableProcess(AmoroProcess process) {
- TableProcessStore store = process.store();
- if (store.getStatus() == ProcessStatus.SUBMITTED) {
- new PersistencyHelper().createProcessState(store);
- } else {
- new PersistencyHelper().updateProcessStatus(store);
- }
- }
-
- private static class PersistencyHelper extends PersistentBase {
-
- void createProcessState(TableProcessStore store) {
- TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store);
- doAs(
- TableProcessMapper.class,
- mapper ->
- mapper.updateProcess(
- meta.getTableId(),
- meta.getProcessId(),
- meta.getExternalProcessIdentifier(),
- meta.getStatus(),
- meta.getProcessStage(),
- meta.getRetryNumber(),
- meta.getFinishTime(),
- meta.getFailMessage(),
- meta.getProcessParameters(),
- meta.getSummary()));
- }
-
- void updateProcessStatus(TableProcessStore store) {
- TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store);
- doAs(
- TableProcessMapper.class,
- mapper ->
- mapper.updateProcess(
- meta.getTableId(),
- meta.getProcessId(),
- meta.getExternalProcessIdentifier(),
- meta.getStatus(),
- meta.getProcessStage(),
- meta.getRetryNumber(),
- meta.getFinishTime(),
- meta.getFailMessage(),
- meta.getProcessParameters(),
- meta.getSummary()));
- }
- }
-
- private class ExternalTableProcess extends TableProcess {
-
- ExternalTableProcess(TableRuntime tableRuntime) {
- super(
- tableRuntime,
- new DefaultTableProcessStore(
- tableRuntime, new TableProcessMeta(),
PeriodicExternalScheduler.this.getAction()));
- }
-
- ExternalTableProcess(TableRuntime tableRuntime, TableProcessState state) {
- super(
- tableRuntime,
- new DefaultTableProcessStore(
- tableRuntime,
- TableProcessMeta.fromTableProcessState(state),
- PeriodicExternalScheduler.this.getAction()));
- }
-
- @Override
- protected void closeInternal() {}
- }
-
- private class ExternalProcessFactory implements ProcessFactory {
-
- @Override
- public AmoroProcess create(TableRuntime tableRuntime, Action action) {
- return new ExternalTableProcess(tableRuntime);
- }
-
- @Override
- public AmoroProcess recover(TableRuntime tableRuntime, TableProcessState
state) {
- return new ExternalTableProcess(tableRuntime, state);
- }
- }
-
- protected class ManagedTableProcess implements ManagedProcess {
-
- private final AmoroProcess process;
-
- ManagedTableProcess(AmoroProcess process) {
- this.process = process;
- }
-
- @Override
- public void submit() {
- Resource resource = resourceContainer.submit(this);
- if (resource == null) {
- throw new IllegalStateException("Submit table process can not return
null resource");
- }
- persistTableProcess(this);
- resourceManager.createResource(resource);
- getCompleteFuture()
- .whenCompleted(
- () -> {
- resourceManager.deleteResource(resource.getResourceId());
- if (store().getStatus() == ProcessStatus.FAILED
- && store().getRetryNumber() < getMaxRetryNumber()) {
- retry();
- }
- });
-
store().begin().updateTableProcessStatus(ProcessStatus.SUBMITTED).commit();
- getSubmitFuture().complete();
- }
-
- @Override
- public void complete() {
- store()
- .begin()
- .updateTableProcessStatus(ProcessStatus.SUCCESS)
- .updateFinishTime(System.currentTimeMillis())
- .commit();
- process.getCompleteFuture().complete();
- }
-
- @Override
- public void complete(String failedReason) {
- store()
- .begin()
- .updateTableProcessStatus(ProcessStatus.FAILED)
- .updateTableProcessFailMessage(failedReason)
- .updateFinishTime(System.currentTimeMillis())
- .commit();
- process.getCompleteFuture().complete();
- }
-
- @Override
- public void retry() {
- store()
- .begin()
- .updateTableProcessStatus(ProcessStatus.PENDING)
- .updateRetryNumber(store().getRetryNumber() + 1)
- .updateExternalProcessIdentifier("")
- .commit();
- submit();
- }
-
- @Override
- public void kill() {
- store()
- .begin()
- .updateTableProcessStatus(ProcessStatus.KILLED)
- .updateFinishTime(System.currentTimeMillis())
- .commit();
- process.getCompleteFuture().complete();
- }
-
- @Override
- public SimpleFuture getSubmitFuture() {
- return process.getSubmitFuture();
- }
-
- @Override
- public SimpleFuture getCompleteFuture() {
- return process.getCompleteFuture();
- }
-
- @Override
- public TableProcessStore store() {
- return process.store();
- }
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
index e07cd4432..05e17adc0 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
@@ -18,17 +18,26 @@
package org.apache.amoro.server.table;
+import org.apache.amoro.Action;
import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.SupportsProcessPlugins;
-import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.amoro.table.TableRuntimeStore;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
public abstract class AbstractTableRuntime extends PersistentBase
- implements TableRuntime, SupportsProcessPlugins {
+ implements SupportProcessManagement {
private final TableRuntimeStore store;
+ private final Map<Action, TableProcessContainer> processContainerMap =
Maps.newConcurrentMap();
protected AbstractTableRuntime(TableRuntimeStore store) {
this.store = store;
@@ -48,6 +57,49 @@ public abstract class AbstractTableRuntime extends
PersistentBase
return TableConfigurations.parseTableConfig(store().getTableConfig());
}
+ @Override
+ public Map<String, String> getTableConfig() {
+ return store().getTableConfig();
+ }
+
+ @Override
+ public List<TableProcessStore> getProcessStates() {
+ return processContainerMap.values().stream()
+ .flatMap(container -> container.getProcessStates().stream())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<TableProcessStore> getProcessStates(Action action) {
+ return processContainerMap.get(action).getProcessStates();
+ }
+
+ @Override
+ public void registerProcess(TableProcessStore processStore) {
+ processContainerMap
+ .computeIfAbsent(processStore.getAction(), k -> new
TableProcessContainer())
+ .processLock
+ .lock();
+ try {
+ processContainerMap
+ .get(processStore.getAction())
+ .processMap
+ .put(processStore.getProcessId(), processStore);
+ } finally {
+ processContainerMap.get(processStore.getAction()).processLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeProcess(TableProcessStore processStore) {
+ processContainerMap.computeIfPresent(
+ processStore.getAction(),
+ (action, container) -> {
+ container.processMap.remove(processStore.getProcessId());
+ return container;
+ });
+ }
+
@Override
public String getGroupName() {
return store().getGroupName();
@@ -61,4 +113,13 @@ public abstract class AbstractTableRuntime extends
PersistentBase
public void dispose() {
store().dispose();
}
+
+ private static class TableProcessContainer {
+ private final Lock processLock = new ReentrantLock();
+ private final Map<Long, TableProcessStore> processMap =
Maps.newConcurrentMap();
+
+ public List<TableProcessStore> getProcessStates() {
+ return Lists.newArrayList(processMap.values());
+ }
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 11c1a21cf..8a337451c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -18,10 +18,7 @@
package org.apache.amoro.server.table;
-import org.apache.amoro.Action;
import org.apache.amoro.AmoroTable;
-import org.apache.amoro.SupportsProcessPlugins;
-import org.apache.amoro.TableRuntime;
import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.config.TableConfiguration;
@@ -30,10 +27,7 @@ import org.apache.amoro.metrics.MetricRegistry;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.TableRuntimeOptimizingState;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
-import org.apache.amoro.process.AmoroProcess;
-import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
@@ -48,7 +42,6 @@ import
org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
-import
org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.amoro.table.BaseTable;
import org.apache.amoro.table.ChangeTable;
import org.apache.amoro.table.MixedTable;
@@ -64,13 +57,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
/** Default table runtime implementation. */
-public class DefaultTableRuntime extends AbstractTableRuntime
- implements TableRuntime, SupportsProcessPlugins {
+public class DefaultTableRuntime extends AbstractTableRuntime {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultTableRuntime.class);
@@ -95,8 +84,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime
public static final List<StateKey<?>> REQUIRED_STATES =
Lists.newArrayList(
OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY,
CLEANUP_STATE_KEY);
-
- private final Map<Action, TableProcessContainer> processContainerMap =
Maps.newConcurrentMap();
private final TableOptimizingMetrics optimizingMetrics;
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
private final TableSummaryMetrics tableSummaryMetrics;
@@ -132,39 +119,6 @@ public class DefaultTableRuntime extends
AbstractTableRuntime
this.tableSummaryMetrics.register(metricRegistry);
}
- @Override
- public AmoroProcess trigger(Action action) {
- return Optional.ofNullable(processContainerMap.get(action))
- .map(container -> container.trigger(action))
- // Define a related exception
- .orElseThrow(() -> new IllegalArgumentException("No ProcessFactory for
action " + action));
- }
-
- @Override
- public void install(Action action, ProcessFactory processFactory) {
- if (processContainerMap.putIfAbsent(action, new
TableProcessContainer(processFactory))
- != null) {
- throw new IllegalStateException("ProcessFactory for action " + action +
" already exists");
- }
- }
-
- @Override
- public boolean enabled(Action action) {
- return processContainerMap.get(action) != null;
- }
-
- @Override
- public List<TableProcessStore> getProcessStates() {
- return processContainerMap.values().stream()
- .flatMap(container -> container.getProcessStates().stream())
- .collect(Collectors.toList());
- }
-
- @Override
- public List<TableProcessStore> getProcessStates(Action action) {
- return processContainerMap.get(action).getProcessStates();
- }
-
public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() {
return orphanFilesCleaningMetrics;
}
@@ -586,30 +540,4 @@ public class DefaultTableRuntime extends
AbstractTableRuntime
return currentSnapshotId;
}
-
- private class TableProcessContainer {
- private final Lock processLock = new ReentrantLock();
- private final ProcessFactory processFactory;
- private final Map<Long, AmoroProcess> processMap = Maps.newConcurrentMap();
-
- TableProcessContainer(ProcessFactory processFactory) {
- this.processFactory = processFactory;
- }
-
- public AmoroProcess trigger(Action action) {
- processLock.lock();
- try {
- AmoroProcess process = processFactory.create(DefaultTableRuntime.this,
action);
- process.getCompleteFuture().whenCompleted(() ->
processMap.remove(process.getId()));
- processMap.put(process.getId(), process);
- return process;
- } finally {
- processLock.unlock();
- }
- }
-
- public List<TableProcessStore> getProcessStates() {
- return
processMap.values().stream().map(AmoroProcess::store).collect(Collectors.toList());
- }
- }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
index 86e748cc5..41c8041f2 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
@@ -21,6 +21,9 @@ package org.apache.amoro.server.table;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.StateKey;
import org.apache.amoro.table.TableRuntimeFactory;
import org.apache.amoro.table.TableRuntimeStore;
@@ -41,6 +44,14 @@ public class DefaultTableRuntimeFactory implements
TableRuntimeFactory {
return "default";
}
+ @Override
+ public List<ActionCoordinator> supportedCoordinators() {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public void initialize(List<ProcessFactory> factories) {}
+
@Override
public Optional<TableRuntimeCreator> accept(
ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties) {
diff --git
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java
similarity index 60%
copy from
amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
copy to
amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java
index 7654af710..b6ff971b8 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java
@@ -16,25 +16,25 @@
* limitations under the License.
*/
-package org.apache.amoro.table;
+package org.apache.amoro.server.table;
-import org.apache.amoro.ActivePlugin;
-import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.TableProcessStore;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+/** Interface for AMS inner used. */
+public interface SupportProcessManagement extends TableRuntime {
-/** Table runtime factory. */
-public interface TableRuntimeFactory extends ActivePlugin {
+ /**
+ * Register a process store to the table runtime.
+ *
+ * @param processStore the process store to register
+ */
+ void registerProcess(TableProcessStore processStore);
- Optional<TableRuntimeCreator> accept(
- ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties);
-
- interface TableRuntimeCreator {
- List<StateKey<?>> requiredStateKeys();
-
- TableRuntime create(TableRuntimeStore store);
- }
+ /**
+ * Remove a process store from the table runtime.
+ *
+ * @param processStore the process store to remove
+ */
+ void removeProcess(TableProcessStore processStore);
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
index da3617a14..9d0f89b6b 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
@@ -21,18 +21,20 @@ package org.apache.amoro.server.process;
import org.apache.amoro.Action;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
/** Mock implementation of {@link ActionCoordinator} used in tests. */
public class MockActionCoordinator implements ActionCoordinator {
public static final int PROCESS_MAX_POOL_SIZE = 1000;
- private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[]
{TableFormat.PAIMON};
-
- public static final Action DEFAULT_ACTION = new Action(DEFAULT_FORMATS, 0,
"default_action");
+ public static final Action DEFAULT_ACTION =
Action.register("default_action");
+ public static final SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new
SnowflakeIdGenerator();
/**
* Whether the format is supported.
@@ -61,12 +63,6 @@ public class MockActionCoordinator implements
ActionCoordinator {
return DEFAULT_ACTION;
}
- /** Get execution engine name. */
- @Override
- public String executionEngine() {
- return "default";
- }
-
/** Next executing time. */
@Override
public long getNextExecutingTime(TableRuntime tableRuntime) {
@@ -92,19 +88,19 @@ public class MockActionCoordinator implements
ActionCoordinator {
* @return mock process
*/
@Override
- public TableProcess createTableProcess(TableRuntime tableRuntime) {
+ public Optional<TableProcess> trigger(TableRuntime tableRuntime) {
TableProcessMeta tableProcessMeta =
TableProcessMeta.of(
SNOWFLAKE_ID_GENERATOR.generateId(),
tableRuntime.getTableIdentifier().getId(),
action().getName(),
- executionEngine(),
+ "default",
new HashMap<>());
TableProcessStore tableProcessStore =
new DefaultTableProcessStore(
tableProcessMeta.getProcessId(), tableRuntime, tableProcessMeta,
action(), 3);
MockTableProcess mockTableProcess = new MockTableProcess(tableRuntime,
tableProcessStore);
- return mockTableProcess;
+ return Optional.of(mockTableProcess);
}
/**
@@ -120,18 +116,6 @@ public class MockActionCoordinator implements
ActionCoordinator {
return new MockTableProcess(tableRuntime, processStore);
}
- /** Return same process to cancel. */
- @Override
- public TableProcess cancelTableProcess(TableRuntime tableRuntime,
TableProcess process) {
- return process;
- }
-
- /** Return same process to retry. */
- @Override
- public TableProcess retryTableProcess(TableProcess process) {
- return process;
- }
-
/** Open plugin. */
@Override
public void open(Map<String, String> properties) {}
diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 42671a6e1..1d5f81b3b 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -20,38 +20,33 @@ package org.apache.amoro;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
-import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
public final class Action {
private static final int MAX_NAME_LENGTH = 16;
-
- /** supported table formats of this action */
- private final TableFormat[] formats;
+ private static final Map<String, Action> registeredActions = new
ConcurrentHashMap<>();
private final String name;
- /**
- * the weight number of this action, the bigger the weight number, the
higher positions of
- * schedulers or front pages
- */
- private final int weight;
- public Action(TableFormat[] formats, int weight, String name) {
- Preconditions.checkArgument(
- name.length() <= MAX_NAME_LENGTH,
- "Action name length should be less than " + MAX_NAME_LENGTH);
- this.formats = formats;
- this.name = name;
- this.weight = weight;
+ public static Action register(String name) {
+ final String regularName = name.trim().toUpperCase(Locale.ROOT);
+ return registeredActions.computeIfAbsent(regularName, s -> new
Action(regularName));
}
- public int getWeight() {
- return weight;
+ public static Action valueOf(String name) {
+ final String regularName = name.trim().toUpperCase(Locale.ROOT);
+ return registeredActions.get(regularName);
}
- public TableFormat[] supportedFormats() {
- return formats;
+ private Action(String name) {
+ Preconditions.checkArgument(
+ name.length() <= MAX_NAME_LENGTH,
+ "Action name length should be less than " + MAX_NAME_LENGTH);
+ this.name = name;
}
public String getName() {
@@ -67,13 +62,11 @@ public final class Action {
return false;
}
Action action = (Action) o;
- return Objects.equals(name, action.name) && Arrays.equals(formats,
action.formats);
+ return Objects.equals(name, action.name);
}
@Override
public int hashCode() {
- int result = Objects.hash(name);
- result = 31 * result + Arrays.hashCode(formats);
- return result;
+ return Objects.hash(name);
}
}
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index 76f470d98..c75c5ac8d 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -23,9 +23,9 @@ public class IcebergActions {
private static final TableFormat[] DEFAULT_FORMATS =
new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE};
- public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system");
- public static final Action REWRITE = new Action(DEFAULT_FORMATS, 10,
"rewrite");
- public static final Action DELETE_ORPHANS = new Action(DEFAULT_FORMATS, 2,
"delete-orphans");
- public static final Action SYNC_HIVE = new Action(DEFAULT_FORMATS, 3,
"sync-hive");
- public static final Action EXPIRE_DATA = new Action(DEFAULT_FORMATS, 1,
"expire-data");
+ public static final Action SYSTEM = Action.register("system");
+ public static final Action REWRITE = Action.register("rewrite");
+ public static final Action DELETE_ORPHANS =
Action.register("delete-orphans");
+ public static final Action SYNC_HIVE = Action.register("sync-hive");
+ public static final Action EXPIRE_DATA = Action.register("expire-data");
}
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index 31ea74f1f..ae2f610cc 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -24,6 +24,7 @@ import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.process.TableProcessStore;
import java.util.List;
+import java.util.Map;
/**
* TableRuntime is the key interface for the AMS framework to interact with
the table. Typically, it
@@ -59,12 +60,20 @@ public interface TableRuntime {
ServerTableIdentifier getTableIdentifier();
/**
- * Get the table configuration.
+ * Get the table configuration. @Deprecated use {@link #getTableConfig()}
instead.
*
* @return the table configuration
*/
+ @Deprecated
TableConfiguration getTableConfiguration();
+ /**
+ * Get the table configuration.
+ *
+ * @return the table configuration
+ */
+ Map<String, String> getTableConfig();
+
/**
* Register the metric of the table runtime.
*
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java
b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java
similarity index 72%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java
rename to
amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java
index 5e55154cb..1f00a1c4f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java
@@ -16,15 +16,14 @@
* limitations under the License.
*/
-package org.apache.amoro.server.process;
+package org.apache.amoro.process;
import org.apache.amoro.Action;
import org.apache.amoro.ActivePlugin;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
-import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessStore;
-import org.apache.amoro.server.utils.SnowflakeIdGenerator;
+
+import java.util.Optional;
/**
* Coordinator for a specific {@link org.apache.amoro.Action} to manage table
processes. Provides
@@ -32,10 +31,6 @@ import org.apache.amoro.server.utils.SnowflakeIdGenerator;
*/
public interface ActionCoordinator extends ActivePlugin {
- String PROPERTY_PARALLELISM = "parallelism";
-
- SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator();
-
/**
* Check whether the given table format is supported by this coordinator.
*
@@ -58,13 +53,6 @@ public interface ActionCoordinator extends ActivePlugin {
*/
Action action();
- /**
- * Get the execution engine name used by this coordinator.
- *
- * @return execution engine name
- */
- String executionEngine();
-
/**
* Calculate the next executing time for the given table runtime.
*
@@ -94,7 +82,7 @@ public interface ActionCoordinator extends ActivePlugin {
* @param tableRuntime table runtime
* @return a new table process
*/
- TableProcess createTableProcess(TableRuntime tableRuntime);
+ Optional<TableProcess> trigger(TableRuntime tableRuntime);
/**
* Recover a {@link TableProcess} from persisted store.
@@ -104,21 +92,4 @@ public interface ActionCoordinator extends ActivePlugin {
* @return recovered table process
*/
TableProcess recoverTableProcess(TableRuntime tableRuntime,
TableProcessStore processStore);
-
- /**
- * Prepare a {@link TableProcess} for cancellation.
- *
- * @param tableRuntime table runtime
- * @param process table process to cancel
- * @return the process instance to be canceled
- */
- TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess
process);
-
- /**
- * Prepare a {@link TableProcess} for retrying.
- *
- * @param process table process to retry
- * @return the process instance to be retried
- */
- TableProcess retryTableProcess(TableProcess process);
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
index cf0ffd50b..6ff5b6cfb 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
@@ -47,7 +47,7 @@ public interface AmoroProcess {
SimpleFuture getCompleteFuture();
/**
- * Get {@link ProcessState} of the process
+ * Get {@link TableProcessStore} of the process
*
* @return the state of the process
*/
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
deleted file mode 100644
index c4a900dca..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-import org.apache.amoro.Action;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.StateField;
-
-/** The state of the optimizing process. */
-public abstract class OptimizingState extends TableProcessState {
-
- @StateField private volatile long targetSnapshotId;
- @StateField private volatile long watermark;
- @StateField private volatile ProcessStage stage;
- @StateField private volatile long currentStageStartTime;
-
- public OptimizingState(Action action, ServerTableIdentifier tableIdentifier)
{
- super(action, tableIdentifier);
- }
-
- public OptimizingState(long id, Action action, ServerTableIdentifier
tableIdentifier) {
- super(id, action, tableIdentifier);
- }
-
- protected void setStage(ProcessStage stage) {
- this.stage = stage;
- this.currentStageStartTime = System.currentTimeMillis();
- }
-
- protected void setStage(ProcessStage stage, long stageStartTime) {
- this.stage = stage;
- this.currentStageStartTime = stageStartTime;
- }
-
- protected void setTargetSnapshotId(long targetSnapshotId) {
- this.targetSnapshotId = targetSnapshotId;
- }
-
- protected void setWatermark(long watermark) {
- this.watermark = watermark;
- }
-
- public long getWatermark() {
- return watermark;
- }
-
- @Override
- public ProcessStage getStage() {
- return stage;
- }
-
- public long getTargetSnapshotId() {
- return targetSnapshotId;
- }
-
- public long getCurrentStageStartTime() {
- return currentStageStartTime;
- }
-
- @Override
- public String getName() {
- return stage.getDesc();
- }
-}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
index a2df0a99f..c2e5c838a 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
@@ -19,30 +19,53 @@
package org.apache.amoro.process;
import org.apache.amoro.Action;
+import org.apache.amoro.ActivePlugin;
+import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.StateKey;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
/**
* A factory to create a process. Normally, There will be default
ProcessFactories for each action
* and used by default scheduler. Meanwhile, user could extend external
ProcessFactory to run jobs
* on external resources like Yarn.
*/
-public interface ProcessFactory {
+public interface ProcessFactory extends ActivePlugin {
+
+ default List<StateKey<?>> requiredStates() {
+ return Lists.newArrayList();
+ }
+
+ /** Get supported actions for each table format. */
+ Map<TableFormat, Set<Action>> supportedActions();
+
+ /** How to trigger a process for the action. */
+ default ProcessTriggerStrategy triggerStrategy(TableFormat format, Action
action) {
+ return ProcessTriggerStrategy.METADATA_TRIGGER;
+ }
/**
- * Create a process for the action.
+ * Try trigger a process for the action.
*
* @param tableRuntime table runtime
* @param action action type
* @return target process which has not been submitted yet.
*/
- AmoroProcess create(TableRuntime tableRuntime, Action action);
+ Optional<TableProcess> trigger(TableRuntime tableRuntime, Action action);
/**
* Recover a process for the action from a state.
*
* @param tableRuntime table runtime
- * @param state state of the process
+ * @param store storage of the process
* @return target process which has not been submitted yet.
+ * @throws RecoverProcessFailedException if recover failed
*/
- AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state);
+ TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
+ throws RecoverProcessFailedException;
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java
deleted file mode 100644
index 91f76fcae..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-import org.apache.amoro.Action;
-
-import java.util.Map;
-
-/**
- * ProcessState contains information in any {@link AmoroProcess} which must be
persistent and {@link
- * ProcessFactory} will use to recover {@link AmoroProcess}.
- */
-public interface ProcessState {
-
- /** @return unique identifier of the process. */
- long getId();
-
- /**
- * @return the name of the state. If multiple stages are involved, it should
be the name of the
- * current stage.
- */
- String getName();
-
- /** @return start time of the process. */
- long getStartTime();
-
- /** @return the action of the process. */
- Action getAction();
-
- /** @return the status of the process. */
- ProcessStatus getStatus();
-
- /**
- * Get the string encoded summary of the process, this could be a simple
description or a POJO
- * encoded by JSON
- *
- * @return the summary of the process
- */
- Map<String, String> getSummary();
-
- /** @return the reason of process failure, null if the process has not
failed yet. */
- String getFailedReason();
-
- /**
- * Total millisecond running time of all tasks in the process.
- *
- * @return actual quota runtime of the process.
- */
- long getQuotaRuntime();
-
- /**
- * Quota value is calculated by the total millisecond running time of all
tasks in the process
- * divided by the total millisecond from the start time to the current time.
It is used to
- * evaluate the actual runtime concurrence of the process.
- *
- * @return the quota value of the process.
- */
- default double getQuotaValue() {
- return (double) getQuotaRuntime() / (System.currentTimeMillis() -
getStartTime());
- }
-}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java
new file mode 100644
index 000000000..43bf5aed8
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import java.time.Duration;
+
+/** Process trigger strategy. */
+public final class ProcessTriggerStrategy {
+
+ public static final ProcessTriggerStrategy METADATA_TRIGGER =
+ new ProcessTriggerStrategy(Duration.ofDays(1), true, 1);
+
+ private final Duration triggerInterval;
+
+ private final boolean triggerOnNewSnapshot;
+
+ private final int triggerParallelism;
+
+ public ProcessTriggerStrategy(
+ Duration triggerInterval, boolean triggerOnNewSnapshot, int
triggerParallelism) {
+ this.triggerInterval = triggerInterval;
+ this.triggerOnNewSnapshot = triggerOnNewSnapshot;
+ this.triggerParallelism = triggerParallelism;
+ }
+
+ public static ProcessTriggerStrategy triggerAtFixRate(Duration
triggerInterval) {
+ return new ProcessTriggerStrategy(triggerInterval, false, 1);
+ }
+
+ public Duration getTriggerInterval() {
+ return triggerInterval;
+ }
+
+ public boolean isTriggerOnNewSnapshot() {
+ return triggerOnNewSnapshot;
+ }
+
+ public int getTriggerParallelism() {
+ return triggerParallelism;
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
b/amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java
similarity index 58%
copy from
amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
copy to
amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java
index 7654af710..5e9c0c64d 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++
b/amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java
@@ -16,25 +16,11 @@
* limitations under the License.
*/
-package org.apache.amoro.table;
+package org.apache.amoro.process;
-import org.apache.amoro.ActivePlugin;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.TableRuntime;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Table runtime factory. */
-public interface TableRuntimeFactory extends ActivePlugin {
-
- Optional<TableRuntimeCreator> accept(
- ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties);
-
- interface TableRuntimeCreator {
- List<StateKey<?>> requiredStateKeys();
-
- TableRuntime create(TableRuntimeStore store);
+/** Exception thrown when a process recovery fails. */
+public class RecoverProcessFailedException extends RuntimeException {
+ public RecoverProcessFailedException(String message) {
+ super(message);
}
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
deleted file mode 100644
index c4e800ed1..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-import org.apache.amoro.Action;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.StateField;
-
-import java.util.Map;
-
-/** A common state of a table process. */
-public class TableProcessState implements ProcessState {
-
- @StateField private volatile long id;
- @StateField private volatile String externalProcessIdentifier;
- private final Action action;
- private final ServerTableIdentifier tableIdentifier;
- private String executionEngine;
- @StateField private int retryNumber = 0;
- @StateField private long startTime = -1L;
- @StateField private long endTime = -1L;
- @StateField private ProcessStatus status = ProcessStatus.PENDING;
- @StateField private volatile String failedReason;
- private volatile Map<String, String> processParameters;
- private volatile Map<String, String> summary;
-
- public TableProcessState(Action action, ServerTableIdentifier
tableIdentifier) {
- this.action = action;
- this.tableIdentifier = tableIdentifier;
- }
-
- public TableProcessState(long id, Action action, ServerTableIdentifier
tableIdentifier) {
- this.id = id;
- this.action = action;
- this.tableIdentifier = tableIdentifier;
- }
-
- public TableProcessState(
- long id, Action action, ServerTableIdentifier tableIdentifier, String
executionEngine) {
- this.id = id;
- this.action = action;
- this.tableIdentifier = tableIdentifier;
- this.executionEngine = executionEngine;
- }
-
- @Override
- public long getId() {
- return id;
- }
-
- public String getExternalProcessIdentifier() {
- return externalProcessIdentifier;
- }
-
- public String getName() {
- return action.getName();
- }
-
- public Action getAction() {
- return action;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public ProcessStatus getStatus() {
- return status;
- }
-
- public String getExecutionEngine() {
- return executionEngine;
- }
-
- @Override
- public Map<String, String> getSummary() {
- return summary;
- }
-
- @Override
- public long getQuotaRuntime() {
- return getDuration();
- }
-
- @Override
- public double getQuotaValue() {
- return 1;
- }
-
- public long getDuration() {
- return endTime > 0 ? endTime - startTime : System.currentTimeMillis() -
startTime;
- }
-
- public ServerTableIdentifier getTableIdentifier() {
- return tableIdentifier;
- }
-
- public void setExternalProcessIdentifier(String externalProcessIdentifier) {
- this.externalProcessIdentifier = externalProcessIdentifier;
- }
-
- public void setExecutionEngine(String executionEngine) {
- this.executionEngine = executionEngine;
- }
-
- protected void setSummary(Map<String, String> summary) {
- this.summary = summary;
- }
-
- protected void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- public void setStatus(ProcessStatus status) {
- if (status == ProcessStatus.SUCCESS
- || status == ProcessStatus.FAILED
- || status == ProcessStatus.KILLED) {
- endTime = System.currentTimeMillis();
- } else if (this.status != ProcessStatus.SUBMITTED && status ==
ProcessStatus.SUBMITTED) {
- endTime = -1L;
- failedReason = null;
- summary = null;
- }
- this.status = status;
- }
-
- public String getFailedReason() {
- return failedReason;
- }
-
- public ProcessStage getStage() {
- return status.toStage();
- }
-
- protected void setId(long processId) {
- this.id = processId;
- }
-
- public Map<String, String> getProcessParameters() {
- return processParameters;
- }
-
- public void setProcessParameters(Map<String, String> processParameters) {
- this.processParameters = processParameters;
- }
-
- public void setSubmitted(String externalProcessIdentifier) {
- this.status = ProcessStatus.SUBMITTED;
- setExternalProcessIdentifier(externalProcessIdentifier);
- this.startTime = System.currentTimeMillis();
- }
-
- public void setSubmitted() {
- this.status = ProcessStatus.SUBMITTED;
- this.startTime = System.currentTimeMillis();
- }
-
- public void setRunning() {
- this.status = ProcessStatus.RUNNING;
- }
-
- public void setCanceling() {
- this.status = ProcessStatus.CANCELING;
- }
-
- public void addRetryNumber() {
- this.retryNumber += 1;
- this.status = ProcessStatus.PENDING;
- this.externalProcessIdentifier = "";
- this.failedReason = null;
- }
-
- public void resetRetryNumber() {
- this.retryNumber = 0;
- }
-
- public void setCanceled() {
- this.status = ProcessStatus.CANCELED;
- }
-
- public void setCompleted() {
- this.status = ProcessStatus.SUCCESS;
- this.endTime = System.currentTimeMillis();
- }
-
- public void setKilled() {
- this.status = ProcessStatus.KILLED;
- this.endTime = System.currentTimeMillis();
- }
-
- public void setCompleted(String failedReason) {
- this.status = ProcessStatus.FAILED;
- this.failedReason = failedReason;
- this.endTime = System.currentTimeMillis();
- }
-
- public int getRetryNumber() {
- return retryNumber;
- }
-
- public void setRetryNumber(int retryNumber) {
- this.retryNumber = retryNumber;
- }
-}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
index 7654af710..90e26bd4b 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
@@ -21,6 +21,8 @@ package org.apache.amoro.table;
import org.apache.amoro.ActivePlugin;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ProcessFactory;
import java.util.List;
import java.util.Map;
@@ -29,6 +31,10 @@ import java.util.Optional;
/** Table runtime factory. */
public interface TableRuntimeFactory extends ActivePlugin {
+ List<ActionCoordinator> supportedCoordinators();
+
+ void initialize(List<ProcessFactory> factories);
+
Optional<TableRuntimeCreator> accept(
ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties);