baiyangtx commented on code in PR #3924: URL: https://github.com/apache/amoro/pull/3924#discussion_r2556291988
########## amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.manager.AbstractPluginManager; +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.process.executor.EngineType; +import org.apache.amoro.server.process.executor.ExecuteEngine; +import org.apache.amoro.server.process.executor.TableProcessExecutor; +import org.apache.amoro.server.table.RuntimeHandlerChain; +import org.apache.amoro.server.table.TableService; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; +import org.apache.ibatis.annotations.Param; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ProcessService extends PersistentBase { + private static final Logger LOG = LoggerFactory.getLogger(ProcessService.class); + private final TableService tableService; + + private final Map<String, ActionCoordinatorScheduler> actionCoordinators = + new ConcurrentHashMap<>(); + private final Map<EngineType, ExecuteEngine> executeEngines = new ConcurrentHashMap<>(); + + private final ActionCoordinatorManager actionCoordinatorManager; + private final ExecuteEngineManager executeEngineManager; + private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler(); + private final ThreadPoolExecutor processExecutionPool = + new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + + private final TableProcessTracker tableProcessTracker = new TableProcessTracker(); + + public ProcessService(Configurations serviceConfig, TableService tableService) { + this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager()); + } + + public ProcessService( + Configurations serviceConfig, + TableService tableService, + ActionCoordinatorManager actionCoordinatorManager, + ExecuteEngineManager executeEngineManager) { + this.tableService = tableService; + this.actionCoordinatorManager = actionCoordinatorManager; + this.executeEngineManager = executeEngineManager; + this.tableProcessTracker.configure(serviceConfig.toMap()); + } + + public RuntimeHandlerChain getTableHandlerChain() { + return tableRuntimeHandler; + } + + public void register(TableRuntime tableRuntime, TableProcess process) { + synchronized (process) { + persistTableProcess(process); + tableProcessTracker.trackTableProcess(tableRuntime.getTableIdentifier(), process); + executeOrTraceProcess(process); + } + } + + public void recover(TableRuntime tableRuntime, TableProcess process) { + synchronized (process) { + // TODO: init some status + tableProcessTracker.trackTableProcess(tableRuntime.getTableIdentifier(), process); + executeOrTraceProcess(process); + } + } + + public void retry(TableProcess process) { + synchronized (process) { + // TODO: init some status + executeOrTraceProcess(process); + } + } + + public void cancel(TableProcess process) { + synchronized (process) { + // TODO: init some status + cancelProcess(process); + } + } + + public void dispose() { + actionCoordinatorManager.close(); + executeEngineManager.close(); + processExecutionPool.shutdown(); + tableProcessTracker.close(); + } + + private void initialize(List<TableRuntime> tableRuntimes) { + LOG.info("Initializing process service"); + actionCoordinatorManager.initialize(); + actionCoordinatorManager + .installedPlugins() + .forEach( + actionCoordinator -> { + actionCoordinators.put( + actionCoordinator.action().getName(), + new ActionCoordinatorScheduler( + actionCoordinator, tableService, ProcessService.this)); + }); + executeEngineManager.initialize(); + executeEngineManager + .installedPlugins() + .forEach( + executeEngine -> { + executeEngines.put(executeEngine.engineType(), executeEngine); + }); + recoverProcesses(tableRuntimes); + actionCoordinators.values().forEach(s -> s.initialize(tableRuntimes)); + } + + @VisibleForTesting + public void recoverProcesses(List<TableRuntime> tableRuntimes) { + Map<Long, TableRuntime> tableIdToRuntimes = + tableRuntimes.stream() + .collect(Collectors.toMap(t -> t.getTableIdentifier().getId(), t -> t)); + List<TableProcessMeta> activeProcesses = + getAs(TableProcessMapper.class, TableProcessMapper::selectAllActiveProcesses); + activeProcesses.forEach( + processMeta -> { + TableRuntime tableRuntime = tableIdToRuntimes.get(processMeta.getTableId()); + ActionCoordinatorScheduler scheduler = + actionCoordinators.get(processMeta.getProcessType()); + if (tableRuntime != null && scheduler != null) { + scheduler.recover(tableRuntime, processMeta); + } + }); + } + + private void executeOrTraceProcess(TableProcess process) { + + if (!isExecutable(process)) { + LOG.info( + "Table process {} with identifier {} may have been in canceling or canceled, cancel execute process.", + process.getId(), + process.getExternalProcessIdentifier()); + return; + } + + ExecuteEngine executeEngine = + executeEngines.get(EngineType.valueOf(process.store().getExecutionEngine())); + + TableProcessExecutor executor = new TableProcessExecutor(process, executeEngine); + executor.onProcessFinished( + () -> { + ActionCoordinatorScheduler scheduler = + actionCoordinators.get(process.store().getAction().getName()); + if (scheduler != null + && process.getStatus() == ProcessStatus.FAILED + && process.store().getRetryNumber() < scheduler.PROCESS_MAX_RETRY_NUMBER + && process.getTableRuntime() != null) { + process.updateTableProcessRetryTimes(process.store().getRetryNumber() + 1); + scheduler.retry(process.getTableRuntime(), process); + } else { + tableProcessTracker.untrackTableProcessInstance( + process.getTableRuntime().getTableIdentifier()); + } + }); + + processExecutionPool.submit(executor); + + LOG.info( + "Submit table process {} to engine {}, process id:{}", + process, + executeEngine.engineType(), + process.getId()); + } + + private void cancelProcess(TableProcess process) { + + process.updateTableProcessStatus(ProcessStatus.CANCELING); + tableProcessTracker.untrackTableProcessInstance(process.getTableRuntime().getTableIdentifier()); + + ExecuteEngine executeEngine = + executeEngines.get(EngineType.valueOf(process.store().getExecutionEngine())); + + executeEngine.tryCancelTableProcess(process, process.getExternalProcessIdentifier()); + + process.updateTableProcessStatus(ProcessStatus.CANCELED); + + LOG.info( + "Cancel table process {} in engine {}, process id:{}", + process, + executeEngine.engineType(), + process.getId()); + } + + private boolean isExecutable(TableProcess process) { + if (process.getStatus() == ProcessStatus.CANCELING + || process.getStatus() == ProcessStatus.CANCELED) { + return false; + } else { + return true; + } + } + + public TableProcessMeta persistTableProcess(TableProcess process) { + TableProcessMeta processMeta = TableProcessMeta.fromTableProcessStore(process.store()); + doAs( + TableProcessMapper.class, + mapper -> + mapper.insertProcess( + processMeta.getTableId(), + processMeta.getProcessId(), + processMeta.getStatus(), + processMeta.getProcessType(), + processMeta.getProcessStage(), + processMeta.getExecutionEngine(), + processMeta.getCreateTime(), + processMeta.getSummary())); + return processMeta; + } + + @VisibleForTesting + public Map<String, ActionCoordinatorScheduler> getActionCoordinators() { + return actionCoordinators; + } + + @VisibleForTesting + public Map<EngineType, ExecuteEngine> getExecuteEngines() { + return executeEngines; + } + + @VisibleForTesting + public TableProcessTracker getTableProcessTracker() { + return tableProcessTracker; + } + + public static class TableProcessTracker { + private final Map<ServerTableIdentifier, TableProcess> activeTableProcess = + new ConcurrentHashMap<>(); + private Map<String, String> properties; + Review Comment: Map<ProcessId, TableProcess> Map<TableIdentifier, Set<ProcessId>>. ########## amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.TableService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActionCoordinatorScheduler extends PeriodicTableScheduler { + + public static final Logger LOG = LoggerFactory.getLogger(ActionCoordinatorScheduler.class); + public static final int PROCESS_MAX_RETRY_NUMBER = 3; + + private final ActionCoordinator coordinator; + private final ProcessService processService; + + public ActionCoordinatorScheduler( + ActionCoordinator coordinator, TableService tableService, ProcessService processService) { + super(coordinator.action(), tableService, coordinator.parallelism()); + this.coordinator = coordinator; + this.processService = processService; + } + + public ActionCoordinator getCoordinator() { + return coordinator; + } + + @Override + protected boolean formatSupported(TableFormat format) { + return coordinator.formatSupported(format); + } + + @Override + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return coordinator.getNextExecutingTime(tableRuntime); + } + + @Override + protected boolean enabled(TableRuntime tableRuntime) { + return coordinator.enabled(tableRuntime); + } + + @Override + protected void execute(TableRuntime tableRuntime) { + if (hasAliveTableProcess(tableRuntime)) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + LOG.warn( + "Detect table process: {} with status: {} exists for table runtime: {}, skip schedule {} action this time.", + process.getId(), + process.getStatus(), + tableRuntime.getTableIdentifier(), + getAction()); + } else { + TableProcess process = coordinator.createTableProcess(tableRuntime); + processService.register(tableRuntime, process); + } + } + + protected void recover(TableRuntime tableRuntime, TableProcessMeta processMeta) { + if (hasAliveTableProcess(tableRuntime)) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + LOG.warn( + "Detect table process: {} with status: {} exists for table runtime: {}, skip recover {} action this time.", + process.getId(), + process.getStatus(), + tableRuntime.getTableIdentifier(), + getAction()); + } else { + TableProcess process = coordinator.recoverTableProcess(tableRuntime, processMeta); + processService.recover(tableRuntime, process); + } + } + + protected void retry(TableRuntime tableRuntime, TableProcess process) { + TableProcess existProcess = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + if (existProcess != null && existProcess.getId() == process.getId()) { + process = coordinator.retryTableProcess(process); + processService.retry(process); + } else { + LOG.warn( + "Detect no table process exists or exist table process id is different from retry process for table runtime: {}, skip retry {} action this time.", + tableRuntime.getTableIdentifier(), + getAction()); + } + } + + protected void cancel(TableRuntime tableRuntime, TableProcess process) { + process = coordinator.cancelTableProcess(tableRuntime, process); + processService.cancel(process); + } + + protected boolean hasAliveTableProcess(TableRuntime tableRuntime) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + if (process != null + && (process.getStatus() == ProcessStatus.RUNNING + || process.getStatus() == ProcessStatus.SUBMITTED + || process.getStatus() == ProcessStatus.PENDING)) { + return true; + } else { + return false; + } + } + + @Override + protected long getExecutorDelay() { + return coordinator.getExecutorDelay(); + } + + @Override + public void handleTableRemoved(TableRuntime tableRuntime) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + if (process != null) { + cancel(tableRuntime, process); + } Review Comment: ProcessService could do this. ########## amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java: ########## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.manager.AbstractPluginManager; +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.process.executor.EngineType; +import org.apache.amoro.server.process.executor.ExecuteEngine; +import org.apache.amoro.server.process.executor.TableProcessExecutor; +import org.apache.amoro.server.table.RuntimeHandlerChain; +import org.apache.amoro.server.table.TableService; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; +import org.apache.ibatis.annotations.Param; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ProcessService extends PersistentBase { + private static final Logger LOG = LoggerFactory.getLogger(ProcessService.class); + private final TableService tableService; + + private final Map<String, ActionCoordinatorScheduler> actionCoordinators = + new ConcurrentHashMap<>(); + private final Map<EngineType, ExecuteEngine> executeEngines = new ConcurrentHashMap<>(); + + private final ActionCoordinatorManager actionCoordinatorManager; + private final ExecuteEngineManager executeEngineManager; + private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler(); + private final ThreadPoolExecutor processExecutionPool = + new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + + private final TableProcessTracker tableProcessTracker = new TableProcessTracker(); + + public ProcessService(Configurations serviceConfig, TableService tableService) { + this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager()); + } + + public ProcessService( + Configurations serviceConfig, + TableService tableService, + ActionCoordinatorManager actionCoordinatorManager, + ExecuteEngineManager executeEngineManager) { + this.tableService = tableService; + this.actionCoordinatorManager = actionCoordinatorManager; + this.executeEngineManager = executeEngineManager; + this.tableProcessTracker.configure(serviceConfig.toMap()); + } + + public RuntimeHandlerChain getTableHandlerChain() { + return tableRuntimeHandler; + } + + public void register(TableRuntime tableRuntime, TableProcess process) { + synchronized (process) { + persistTableProcess(process); + tableProcessTracker.trackTableProcess(tableRuntime.getTableIdentifier(), process); + executeOrTraceProcess(process); + } + } + + public void recover(TableRuntime tableRuntime, TableProcess process) { + synchronized (process) { + // TODO: init some status + tableProcessTracker.trackTableProcess(tableRuntime.getTableIdentifier(), process); + executeOrTraceProcess(process); + } + } Review Comment: private method. same with below. ########## amoro-ams/src/test/java/org/apache/amoro/server/process/TestExecuteEngine.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.process.executor.EngineType; +import org.apache.amoro.server.process.executor.ExecuteEngine; +import org.apache.amoro.server.process.executor.ExecuteOption; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; +import org.apache.parquet.Strings; + +import java.security.SecureRandom; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class TestExecuteEngine implements ExecuteEngine { Review Comment: Maybe MockExecuteEngine is better. ########## amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.TableService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActionCoordinatorScheduler extends PeriodicTableScheduler { + + public static final Logger LOG = LoggerFactory.getLogger(ActionCoordinatorScheduler.class); + public static final int PROCESS_MAX_RETRY_NUMBER = 3; + + private final ActionCoordinator coordinator; + private final ProcessService processService; + + public ActionCoordinatorScheduler( + ActionCoordinator coordinator, TableService tableService, ProcessService processService) { + super(coordinator.action(), tableService, coordinator.parallelism()); + this.coordinator = coordinator; + this.processService = processService; + } + + public ActionCoordinator getCoordinator() { + return coordinator; + } + + @Override + protected boolean formatSupported(TableFormat format) { + return coordinator.formatSupported(format); + } + + @Override + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return coordinator.getNextExecutingTime(tableRuntime); + } + + @Override + protected boolean enabled(TableRuntime tableRuntime) { + return coordinator.enabled(tableRuntime); + } + + @Override + protected void execute(TableRuntime tableRuntime) { + if (hasAliveTableProcess(tableRuntime)) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + LOG.warn( + "Detect table process: {} with status: {} exists for table runtime: {}, skip schedule {} action this time.", + process.getId(), + process.getStatus(), + tableRuntime.getTableIdentifier(), + getAction()); + } else { + TableProcess process = coordinator.createTableProcess(tableRuntime); + processService.register(tableRuntime, process); + } + } + + protected void recover(TableRuntime tableRuntime, TableProcessMeta processMeta) { + if (hasAliveTableProcess(tableRuntime)) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + LOG.warn( + "Detect table process: {} with status: {} exists for table runtime: {}, skip recover {} action this time.", + process.getId(), + process.getStatus(), + tableRuntime.getTableIdentifier(), + getAction()); + } else { + TableProcess process = coordinator.recoverTableProcess(tableRuntime, processMeta); Review Comment: processStore ########## amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.TableService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActionCoordinatorScheduler extends PeriodicTableScheduler { + + public static final Logger LOG = LoggerFactory.getLogger(ActionCoordinatorScheduler.class); + public static final int PROCESS_MAX_RETRY_NUMBER = 3; + + private final ActionCoordinator coordinator; + private final ProcessService processService; + + public ActionCoordinatorScheduler( + ActionCoordinator coordinator, TableService tableService, ProcessService processService) { + super(coordinator.action(), tableService, coordinator.parallelism()); + this.coordinator = coordinator; + this.processService = processService; + } + + public ActionCoordinator getCoordinator() { + return coordinator; + } + + @Override + protected boolean formatSupported(TableFormat format) { + return coordinator.formatSupported(format); + } + + @Override + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return coordinator.getNextExecutingTime(tableRuntime); + } + + @Override + protected boolean enabled(TableRuntime tableRuntime) { + return coordinator.enabled(tableRuntime); + } + + @Override + protected void execute(TableRuntime tableRuntime) { + if (hasAliveTableProcess(tableRuntime)) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + LOG.warn( + "Detect table process: {} with status: {} exists for table runtime: {}, skip schedule {} action this time.", + process.getId(), + process.getStatus(), + tableRuntime.getTableIdentifier(), + getAction()); + } else { + TableProcess process = coordinator.createTableProcess(tableRuntime); + processService.register(tableRuntime, process); + } + } + + protected void recover(TableRuntime tableRuntime, TableProcessMeta processMeta) { + if (hasAliveTableProcess(tableRuntime)) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + LOG.warn( + "Detect table process: {} with status: {} exists for table runtime: {}, skip recover {} action this time.", + process.getId(), + process.getStatus(), + tableRuntime.getTableIdentifier(), + getAction()); + } else { + TableProcess process = coordinator.recoverTableProcess(tableRuntime, processMeta); + processService.recover(tableRuntime, process); + } + } + + protected void retry(TableRuntime tableRuntime, TableProcess process) { + TableProcess existProcess = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + if (existProcess != null && existProcess.getId() == process.getId()) { + process = coordinator.retryTableProcess(process); + processService.retry(process); + } else { + LOG.warn( + "Detect no table process exists or exist table process id is different from retry process for table runtime: {}, skip retry {} action this time.", + tableRuntime.getTableIdentifier(), + getAction()); + } Review Comment: No need to check here. ########## amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.executor; + +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.shade.guava32.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TableProcessExecutor extends PersistentBase implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(TableProcessExecutor.class); + + private static final long DEFAULT_POLL_INTERVAL_MS = 5000L; + public ExecuteEngine executeEngine; + protected TableProcess tableProcess; + private Runnable finishedCallback; + + public TableProcessExecutor(TableProcess tableProcess, ExecuteEngine executeEngine) { + this.tableProcess = tableProcess; + this.executeEngine = executeEngine; + } + + @Override + public void run() { + String externalProcessIdentifier = null; + ProcessStatus status; + String message = ""; + + if (isTableProcessCanceling(tableProcess.getStatus())) { + LOG.info( + "Table process {} with identifier {} may have been in canceling, exit submit process.", + tableProcess.getId(), + externalProcessIdentifier); + return; + } + + try { + if (tableProcess.getStatus() == ProcessStatus.PENDING + || Strings.isNullOrEmpty(tableProcess.getExternalProcessIdentifier())) { + externalProcessIdentifier = executeEngine.submitTableProcess(tableProcess); + LOG.info( + "Submit table process {} to engine {} success, external process identifier is {}", + tableProcess.getId(), + executeEngine.engineType(), + externalProcessIdentifier); + } else { + externalProcessIdentifier = tableProcess.getExternalProcessIdentifier(); + } + + validateIdentifier(externalProcessIdentifier); + + status = executeEngine.getStatus(externalProcessIdentifier); + tableProcess.updateExternalProcessIdentifier(status, externalProcessIdentifier); + + while (isTableProcessExecuting(status)) { + if (isTableProcessCanceling(tableProcess.getStatus())) { + LOG.info( + "Table process {} with identifier {} may have been in canceling, exit submit process.", + tableProcess.getId(), + externalProcessIdentifier); + return; + } + try { + Thread.sleep(DEFAULT_POLL_INTERVAL_MS); + } catch (InterruptedException e) { + throw e; + } + status = executeEngine.getStatus(externalProcessIdentifier); + } + } catch (Throwable t) { + if (t instanceof InterruptedException) { + LOG.info( + "Table process {} with identifier {} may have been interrupted by canceling process, exit submit process.", + tableProcess.getId(), + externalProcessIdentifier); + return; + } + status = ProcessStatus.FAILED; + message = t.getMessage(); + } Review Comment: The thread **must** set process status to a finnal status before it return. ########## amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.TableService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActionCoordinatorScheduler extends PeriodicTableScheduler { + + public static final Logger LOG = LoggerFactory.getLogger(ActionCoordinatorScheduler.class); + public static final int PROCESS_MAX_RETRY_NUMBER = 3; + + private final ActionCoordinator coordinator; + private final ProcessService processService; + + public ActionCoordinatorScheduler( + ActionCoordinator coordinator, TableService tableService, ProcessService processService) { + super(coordinator.action(), tableService, coordinator.parallelism()); + this.coordinator = coordinator; + this.processService = processService; + } + + public ActionCoordinator getCoordinator() { + return coordinator; + } + + @Override + protected boolean formatSupported(TableFormat format) { + return coordinator.formatSupported(format); + } + + @Override + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return coordinator.getNextExecutingTime(tableRuntime); + } + + @Override + protected boolean enabled(TableRuntime tableRuntime) { + return coordinator.enabled(tableRuntime); + } + + @Override + protected void execute(TableRuntime tableRuntime) { + if (hasAliveTableProcess(tableRuntime)) { + TableProcess process = + processService + .getTableProcessTracker() + .getTableProcessInstance(tableRuntime.getTableIdentifier()); + LOG.warn( + "Detect table process: {} with status: {} exists for table runtime: {}, skip schedule {} action this time.", + process.getId(), + process.getStatus(), + tableRuntime.getTableIdentifier(), + getAction()); + } else { + TableProcess process = coordinator.createTableProcess(tableRuntime); + processService.register(tableRuntime, process); + } + } + + protected void recover(TableRuntime tableRuntime, TableProcessMeta processMeta) { + if (hasAliveTableProcess(tableRuntime)) { Review Comment: All processMeta must be recovered. ########## amoro-ams/src/test/java/org/apache/amoro/server/process/TestExecuteEngine.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.process.executor.EngineType; +import org.apache.amoro.server.process.executor.ExecuteEngine; +import org.apache.amoro.server.process.executor.ExecuteOption; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; +import org.apache.parquet.Strings; + +import java.security.SecureRandom; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class TestExecuteEngine implements ExecuteEngine { + + public static final String CHARACTERS = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + private final ThreadPoolExecutor executionPool = + new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + + private final Map<String, Future<?>> activeInstances = new ConcurrentHashMap<>(); + + private final Map<String, Future<?>> cancelingInstances = new ConcurrentHashMap<>(); + + @Override + public EngineType engineType() { + return EngineType.DEFAULT; + } + + @Override + public ProcessStatus getStatus(String processIdentifier) { + if (Strings.isNullOrEmpty(processIdentifier)) { + return ProcessStatus.UNKNOWN; + } + + Map<String, Future<?>> instances = + cancelingInstances.containsKey(processIdentifier) ? cancelingInstances : activeInstances; + + Future<?> future = instances.get(processIdentifier); + if (future == null) { + return ProcessStatus.UNKNOWN; + } + if (future.isCancelled()) { + instances.remove(processIdentifier); + return ProcessStatus.CANCELED; + } else if (future.isDone()) { + instances.remove(processIdentifier); + try { + future.get(); + return ProcessStatus.SUCCESS; + } catch (Exception e) { + return ProcessStatus.FAILED; + } + } else { + if (cancelingInstances.containsKey(processIdentifier)) { + return ProcessStatus.CANCELING; + } else { + return ProcessStatus.RUNNING; + } + } + } + + @Override + public String submitTableProcess(TableProcess tableProcess) { + String identifier = generateUnique128String(); + Future<?> future = executionPool.submit(new TableProcessExecutor(tableProcess, this)); + activeInstances.put(identifier, future); + return identifier; + } + + @Override + public ProcessStatus tryCancelTableProcess(TableProcess tableProcess, String processIdentifier) { + Future<?> future = activeInstances.get(processIdentifier); + if (future == null) { + return ProcessStatus.CANCELED; + } + + activeInstances.remove(processIdentifier); + cancelingInstances.put(processIdentifier, future); + + if (future.isDone()) { + try { + future.get(); + return ProcessStatus.SUCCESS; + } catch (Exception e) { + return ProcessStatus.FAILED; + } + } else if (future.isCancelled()) { + return ProcessStatus.CANCELED; + } else { + future.cancel(true); + return ProcessStatus.CANCELING; + } + } + + @Override + public void open(Map<String, String> properties) {} + + @Override + public void close() {} + + @Override + public String name() { + return "default_execute_engine"; + } + + /** + * Generate a unique 128-length string + * + * @return Unique 128-length string + * @throws IllegalStateException If failed to generate unique string after max retries + */ + private String generateUnique128String() { + int maxRetryCount = 100; + for (int retry = 0; retry < maxRetryCount; retry++) { + // Generate a 128-length random string + String randomStr = generate128LengthRandomString(); + + // Check uniqueness: return directly if not exists in the collection (containsKey returns + // false) + if (!activeInstances.containsKey(randomStr)) { + System.out.printf("Attempt %d: Successfully generated unique string%n", retry + 1); + return randomStr; + } + + // Print log and retry if duplicate (can remove log in production to avoid redundancy) + System.out.printf("Attempt %d: String already exists, regenerating...%n", retry + 1); + } + + // Throw exception if failed to generate after max retries (theoretically, 62^128 combinations + // result in extremely low duplication probability) + throw new IllegalStateException( + "Failed to generate unique 128-length string after " + 100 + " max retries"); + } + + /** + * Generate a single 128-length random string (based on secure random number) + * + * @return 128-length random string (letters + numbers) + */ + private String generate128LengthRandomString() { + StringBuilder sb = new StringBuilder(128); + SecureRandom secureRandom = new SecureRandom(); + for (int i = 0; i < 128; i++) { + // Randomly select a character from the character set (index range: 0 ~ 61) + int randomIndex = secureRandom.nextInt(CHARACTERS.length()); + sb.append(CHARACTERS.charAt(randomIndex)); + } + return sb.toString(); + } + + @VisibleForTesting + public Map<String, Future<?>> getActiveInstances() { + return activeInstances; + } + + @VisibleForTesting + public Map<String, Future<?>> getCancelingInstances() { + return cancelingInstances; + } + + private class TableProcessExecutor extends PersistentBase implements Runnable { + public ExecuteEngine executeEngine; + public ExecuteOption + executeOption; // Note: This field is declared but not used in the current code + protected TableProcess tableProcess; + + public TableProcessExecutor(TableProcess tableProcess, ExecuteEngine executeEngine) { + this.tableProcess = tableProcess; + this.executeEngine = executeEngine; + } + + @Override + public void run() { + // 1. Generate a random integer between 1 and 100 (recommend ThreadLocalRandom for + // thread-safety and high performance) + int randomNum = ThreadLocalRandom.current().nextInt(1, 101); + System.out.println("Generated random number: " + randomNum); + System.out.println("Start loop: i from 0 to " + randomNum + ", sleep 5 seconds each time"); + System.out.println("----------------------------------------"); + Review Comment: Remove println, replace them with LOG.info ########## amoro-ams/src/test/java/org/apache/amoro/server/process/TestActionCoordinator.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.Action; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessState; +import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.process.executor.EngineType; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TestActionCoordinator implements ActionCoordinator { + public static final int PROCESS_MAX_POOL_SIZE = 1000; + private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] {TableFormat.PAIMON}; + + public static final Action DEFAULT_ACTION = new Action(DEFAULT_FORMATS, 0, "default_action"); Review Comment: MOCK_ACTION maybe better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
