This is an automated email from the ASF dual-hosted git repository.
baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new f6bad83ba [Improvement] Refactor table_optimizing_process into a more
generic structure (#3715)
f6bad83ba is described below
commit f6bad83bab802849730f6e0035179a4c8b8fee88
Author: baiyangtx <[email protected]>
AuthorDate: Tue Aug 12 16:24:30 2025 +0800
[Improvement] Refactor table_optimizing_process into a more generic
structure (#3715)
* [Improvement] Refactor table_optimizing_process into a more generic
structure
* Fix COMMENTS
---------
Co-authored-by: zhangyongxiang.alpha <[email protected]>
---
.../amoro/server/DefaultOptimizingService.java | 8 +-
.../dashboard/MixedAndIcebergTableDescriptor.java | 81 +++++-----
.../server/optimizing/OptimizingProcessMeta.java | 167 ---------------------
.../amoro/server/optimizing/OptimizingQueue.java | 45 +++---
.../amoro/server/optimizing/TaskRuntime.java | 4 +-
.../persistence/SqlSessionFactoryProvider.java | 6 +-
.../server/persistence/TaskFilesPersistence.java | 8 +-
...ingMapper.java => OptimizingProcessMapper.java} | 124 +++------------
.../server/persistence/mapper/TableMetaMapper.java | 27 ++--
.../persistence/mapper/TableProcessMapper.java | 116 ++++++++++++++
.../amoro/server/process/TableProcessMeta.java | 116 ++++++++++++++
.../inline/OptimizingExpiringExecutor.java | 16 +-
.../amoro/server/table/DefaultOptimizingState.java | 6 +-
.../amoro/server/table/DefaultTableManager.java | 8 +-
.../src/main/resources/derby/ams-derby-init.sql | 43 +++---
.../src/main/resources/mysql/ams-mysql-init.sql | 29 ++--
amoro-ams/src/main/resources/mysql/upgrade.sql | 54 ++++++-
.../main/resources/postgres/ams-postgres-init.sql | 83 +++++-----
amoro-ams/src/main/resources/postgres/upgrade.sql | 91 +++++++++++
.../TestIcebergServerTableDescriptor.java | 25 ++-
.../server/optimizing/BaseOptimizingChecker.java | 82 +++++-----
.../optimizing/TestIcebergHadoopOptimizing.java | 11 +-
.../server/optimizing/TestMixedHiveOptimizing.java | 5 +-
.../optimizing/TestMixedIcebergOptimizing.java | 11 +-
.../apache/amoro/optimizing/MetricsSummary.java | 37 +++++
25 files changed, 718 insertions(+), 485 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 397c473eb..099021cde 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -36,14 +36,14 @@ import org.apache.amoro.exception.TaskNotFoundException;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingProcess;
-import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.OptimizerThread;
@@ -288,8 +288,8 @@ public class DefaultOptimizingService extends
StatedPersistentBase
@Override
public boolean cancelProcess(long processId) throws TException {
- OptimizingProcessMeta processMeta =
- getAs(OptimizingMapper.class, m -> m.getOptimizingProcess(processId));
+ TableProcessMeta processMeta =
+ getAs(TableProcessMapper.class, m -> m.getProcessMeta(processId));
if (processMeta == null) {
return false;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index c5c5c23a0..df035ee0b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -22,6 +22,7 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.amoro.AmoroTable;
+import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.data.DataFileType;
@@ -35,12 +36,14 @@ import
org.apache.amoro.server.dashboard.model.TableBasicInfo;
import org.apache.amoro.server.dashboard.model.TableStatistics;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.TableStatCollector;
-import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -510,22 +513,27 @@ public class MixedAndIcebergTableDescriptor extends
PersistentBase
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit,
int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
+ ServerTableIdentifier identifier =
+ getAs(
+ TableMetaMapper.class,
+ m ->
+ m.selectTableIdentifier(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName()));
+ if (identifier == null) {
+ return Pair.of(Collections.emptyList(), 0);
+ }
int total = 0;
// page helper is 1-based
int pageNumber = (offset / limit) + 1;
- List<OptimizingProcessMeta> processMetaList = Collections.emptyList();
+ List<TableProcessMeta> processMetaList = Collections.emptyList();
try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
processMetaList =
getAs(
- OptimizingMapper.class,
- mapper ->
- mapper.selectOptimizingProcesses(
- tableIdentifier.getCatalog(),
- tableIdentifier.getDatabase(),
- tableIdentifier.getTableName(),
- type,
- status));
- PageInfo<OptimizingProcessMeta> pageInfo = new
PageInfo<>(processMetaList);
+ TableProcessMapper.class,
+ mapper -> mapper.listProcessMeta(identifier.getId(), type,
status));
+ PageInfo<TableProcessMeta> pageInfo = new PageInfo<>(processMetaList);
total = (int) pageInfo.getTotal();
LOG.info(
"Get optimizing processes total : {} , pageNumber:{}, limit:{},
offset:{}",
@@ -538,17 +546,19 @@ public class MixedAndIcebergTableDescriptor extends
PersistentBase
}
}
List<Long> processIds =
- processMetaList.stream()
- .map(OptimizingProcessMeta::getProcessId)
- .collect(Collectors.toList());
+
processMetaList.stream().map(TableProcessMeta::getProcessId).collect(Collectors.toList());
Map<Long, List<OptimizingTaskMeta>> optimizingTasks =
- getAs(OptimizingMapper.class, mapper ->
mapper.selectOptimizeTaskMetas(processIds)).stream()
+ getAs(OptimizingProcessMapper.class, mapper ->
mapper.selectOptimizeTaskMetas(processIds))
+ .stream()
.collect(Collectors.groupingBy(OptimizingTaskMeta::getProcessId));
LOG.info("Get {} optimizing tasks. ", optimizingTasks.size());
return Pair.of(
processMetaList.stream()
- .map(p -> buildOptimizingProcessInfo(p,
optimizingTasks.get(p.getProcessId())))
+ .map(
+ p ->
+ buildOptimizingProcessInfo(
+ identifier, p, optimizingTasks.get(p.getProcessId())))
.collect(Collectors.toList()),
total);
}
@@ -568,7 +578,7 @@ public class MixedAndIcebergTableDescriptor extends
PersistentBase
long id = Long.parseLong(processId);
List<OptimizingTaskMeta> optimizingTaskMetaList =
getAs(
- OptimizingMapper.class,
+ OptimizingProcessMapper.class,
mapper ->
mapper.selectOptimizeTaskMetas(Collections.singletonList(id)));
if (CollectionUtils.isEmpty(optimizingTaskMetaList)) {
return Collections.emptyList();
@@ -833,7 +843,9 @@ public class MixedAndIcebergTableDescriptor extends
PersistentBase
}
private static OptimizingProcessInfo buildOptimizingProcessInfo(
- OptimizingProcessMeta meta, List<OptimizingTaskMeta>
optimizingTaskStats) {
+ ServerTableIdentifier identifier,
+ TableProcessMeta meta,
+ List<OptimizingTaskMeta> optimizingTaskStats) {
if (meta == null) {
return null;
}
@@ -858,28 +870,25 @@ public class MixedAndIcebergTableDescriptor extends
PersistentBase
result.setSuccessTasks(successTasks);
result.setRunningTasks(runningTasks);
}
- MetricsSummary summary = meta.getSummary();
- if (summary != null) {
- result.setInputFiles(summary.getInputFilesStatistics());
- result.setOutputFiles(summary.getOutputFilesStatistics());
- }
-
+ MetricsSummary summary = MetricsSummary.fromMap(meta.getSummary());
+ result.setInputFiles(summary.getInputFilesStatistics());
+ result.setOutputFiles(summary.getOutputFilesStatistics());
result.setTableId(meta.getTableId());
- result.setCatalogName(meta.getCatalogName());
- result.setDbName(meta.getDbName());
- result.setTableName(meta.getTableName());
+ result.setCatalogName(identifier.getCatalog());
+ result.setDbName(identifier.getDatabase());
+ result.setTableName(identifier.getTableName());
result.setProcessId(String.valueOf(meta.getProcessId()));
- result.setStartTime(meta.getPlanTime());
- result.setOptimizingType(meta.getOptimizingType().name());
+ result.setStartTime(meta.getCreateTime());
+ result.setOptimizingType(meta.getProcessType());
result.setStatus(ProcessStatus.valueOf(meta.getStatus().name()));
- result.setFailReason(meta.getFailReason());
+ result.setFailReason(meta.getFailMessage());
result.setDuration(
- meta.getEndTime() > 0
- ? meta.getEndTime() - meta.getPlanTime()
- : System.currentTimeMillis() - meta.getPlanTime());
- result.setFinishTime(meta.getEndTime());
- result.setSummary(meta.getSummary().summaryAsMap(true));
+ meta.getFinishTime() > 0
+ ? meta.getFinishTime() - meta.getCreateTime()
+ : System.currentTimeMillis() - meta.getCreateTime());
+ result.setFinishTime(meta.getFinishTime());
+ result.setSummary(meta.getSummary());
return result;
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcessMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcessMeta.java
deleted file mode 100644
index 8fae30c41..000000000
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcessMeta.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.server.optimizing;
-
-import org.apache.amoro.optimizing.MetricsSummary;
-import org.apache.amoro.optimizing.OptimizingType;
-import org.apache.amoro.process.ProcessStatus;
-
-import java.util.Map;
-
-/** Meta of optimizing process. */
-public class OptimizingProcessMeta {
-
- private Long processId;
- private Long tableId;
- private String catalogName;
- private String dbName;
- private String tableName;
- private Long targetSnapshotId;
- private Long targetChangeSnapshotId;
- private ProcessStatus processStatus;
- private OptimizingType optimizingType;
- private long planTime;
- private long endTime;
- private String failReason;
- private MetricsSummary summary;
- private Map<String, Long> fromSequence;
- private Map<String, Long> toSequence;
-
- public OptimizingProcessMeta() {}
-
- public Long getProcessId() {
- return processId;
- }
-
- public void setProcessId(Long processId) {
- this.processId = processId;
- }
-
- public Long getTableId() {
- return tableId;
- }
-
- public void setTableId(Long tableId) {
- this.tableId = tableId;
- }
-
- public String getCatalogName() {
- return catalogName;
- }
-
- public void setCatalogName(String catalogName) {
- this.catalogName = catalogName;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public void setDbName(String dbName) {
- this.dbName = dbName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public Long getTargetSnapshotId() {
- return targetSnapshotId;
- }
-
- public void setTargetSnapshotId(Long targetSnapshotId) {
- this.targetSnapshotId = targetSnapshotId;
- }
-
- public ProcessStatus getStatus() {
- return processStatus;
- }
-
- public void setStatus(ProcessStatus processStatus) {
- this.processStatus = processStatus;
- }
-
- public OptimizingType getOptimizingType() {
- return optimizingType;
- }
-
- public void setOptimizingType(OptimizingType optimizingType) {
- this.optimizingType = optimizingType;
- }
-
- public long getPlanTime() {
- return planTime;
- }
-
- public void setPlanTime(long planTime) {
- this.planTime = planTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- public String getFailReason() {
- return failReason;
- }
-
- public void setFailReason(String failReason) {
- this.failReason = failReason;
- }
-
- public MetricsSummary getSummary() {
- return summary;
- }
-
- public void setSummary(MetricsSummary summary) {
- this.summary = summary;
- }
-
- public Long getTargetChangeSnapshotId() {
- return targetChangeSnapshotId;
- }
-
- public void setTargetChangeSnapshotId(Long targetChangeSnapshotId) {
- this.targetChangeSnapshotId = targetChangeSnapshotId;
- }
-
- public Map<String, Long> getFromSequence() {
- return fromSequence;
- }
-
- public void setFromSequence(Map<String, Long> fromSequence) {
- this.fromSequence = fromSequence;
- }
-
- public Map<String, Long> getToSequence() {
- return toSequence;
- }
-
- public void setToSequence(Map<String, Long> toSequence) {
- this.toSequence = toSequence;
- }
-}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index ed8464fe3..56888fb97 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -37,8 +37,9 @@ import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TaskFilesPersistence;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultOptimizingState;
@@ -190,10 +191,6 @@ public class OptimizingQueue extends PersistentBase {
tableRuntime.getTableIdentifier());
}
- public boolean containsTable(ServerTableIdentifier identifier) {
- return scheduler.getTableRuntime(identifier) != null;
- }
-
private void clearProcess(OptimizingProcess optimizingProcess) {
tableQueue.removeIf(process -> process.getProcessId() ==
optimizingProcess.getProcessId());
retryTaskQueue.removeIf(
@@ -741,22 +738,31 @@ public class OptimizingQueue extends PersistentBase {
doAsTransaction(
() ->
doAs(
- OptimizingMapper.class,
+ TableProcessMapper.class,
mapper ->
- mapper.insertOptimizingProcess(
- optimizingState.getTableIdentifier(),
+ mapper.insertProcess(
+ optimizingState.getTableIdentifier().getId(),
processId,
- targetSnapshotId,
- targetChangeSnapshotId,
status,
- optimizingType,
+ optimizingType.name().toUpperCase(),
+
optimizingState.getOptimizingStatus().name().toLowerCase(),
+ "AMORO",
planTime,
- getSummary(),
+ getSummary().summaryAsMap(false))),
+ () ->
+ doAs(
+ OptimizingProcessMapper.class,
+ mapper ->
+ mapper.insertInternalProcessState(
+ optimizingState.getTableIdentifier().getId(),
+ processId,
+ targetSnapshotId,
+ targetChangeSnapshotId,
fromSequence,
toSequence)),
() ->
doAs(
- OptimizingMapper.class,
+ OptimizingProcessMapper.class,
mapper ->
mapper.insertTaskRuntimes(Lists.newArrayList(taskMap.values()))),
() -> TaskFilesPersistence.persistTaskInputs(processId,
taskMap.values()),
() -> optimizingState.beginProcess(this));
@@ -771,15 +777,16 @@ public class OptimizingQueue extends PersistentBase {
},
() ->
doAs(
- OptimizingMapper.class,
+ TableProcessMapper.class,
mapper ->
- mapper.updateOptimizingProcess(
+ mapper.updateProcess(
optimizingState.getTableIdentifier().getId(),
processId,
status,
- endTime,
- getSummary(),
- getFailedReason())),
+
optimizingState.getOptimizingStatus().name().toLowerCase(),
+ System.currentTimeMillis(),
+ getFailedReason(),
+ getSummary().summaryAsMap(false))),
() -> optimizingState.completeProcess(success),
() -> clearProcess(this));
}
@@ -792,7 +799,7 @@ public class OptimizingQueue extends PersistentBase {
try {
List<TaskRuntime<RewriteStageTask>> taskRuntimes =
getAs(
- OptimizingMapper.class,
+ OptimizingProcessMapper.class,
mapper ->
mapper.selectTaskRuntimes(
optimizingState.getTableIdentifier().getId(),
processId));
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
index 9f7071c91..f6db6e430 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
@@ -28,7 +28,7 @@ import org.apache.amoro.process.SimpleFuture;
import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.persistence.StatedPersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
@@ -259,7 +259,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?,
?, ?>> extends Stated
}
private void persistTaskRuntime() {
- doAs(OptimizingMapper.class, mapper -> mapper.updateTaskRuntime(this));
+ doAs(OptimizingProcessMapper.class, mapper ->
mapper.updateTaskRuntime(this));
}
public TaskQuota getCurrentQuota() {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 608472d16..74544e178 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -27,11 +27,12 @@ import
com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.PlatformFileMapper;
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.ibatis.mapping.DatabaseIdProvider;
import org.apache.ibatis.mapping.Environment;
@@ -63,13 +64,14 @@ public class SqlSessionFactoryProvider {
Environment environment = new Environment("develop", transactionFactory,
dataSource);
Configuration configuration = new Configuration(environment);
configuration.addMapper(TableMetaMapper.class);
- configuration.addMapper(OptimizingMapper.class);
+ configuration.addMapper(OptimizingProcessMapper.class);
configuration.addMapper(CatalogMetaMapper.class);
configuration.addMapper(OptimizerMapper.class);
configuration.addMapper(ApiTokensMapper.class);
configuration.addMapper(PlatformFileMapper.class);
configuration.addMapper(ResourceMapper.class);
configuration.addMapper(TableBlockerMapper.class);
+ configuration.addMapper(TableProcessMapper.class);
PageInterceptor interceptor = new PageInterceptor();
Properties interceptorProperties = new Properties();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
index 92a07a5ab..1163960f5 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
@@ -22,7 +22,7 @@ import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.TaskRuntime;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.utils.CompressUtil;
import org.apache.amoro.utils.SerializationUtil;
@@ -49,7 +49,7 @@ public class TaskFilesPersistence {
public static Map<Integer, RewriteFilesInput> loadTaskInputs(long processId)
{
List<byte[]> bytes =
persistence.getAs(
- OptimizingMapper.class, mapper ->
mapper.selectProcessInputFiles(processId));
+ OptimizingProcessMapper.class, mapper ->
mapper.selectProcessInputFiles(processId));
if (bytes == null || bytes.isEmpty()) {
return Collections.emptyMap();
} else {
@@ -64,7 +64,9 @@ public class TaskFilesPersistence {
private static class DatabasePersistence extends PersistentBase {
public void persistTaskInputs(long processId, Map<Integer,
RewriteFilesInput> tasks) {
- doAs(OptimizingMapper.class, mapper ->
mapper.updateProcessInputFiles(processId, tasks));
+ doAs(
+ OptimizingProcessMapper.class,
+ mapper -> mapper.updateProcessInputFiles(processId, tasks));
}
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java
similarity index 68%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
rename to
amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java
index 3124a6670..abf6a30da 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java
@@ -18,20 +18,14 @@
package org.apache.amoro.server.persistence.mapper;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.optimizing.MetricsSummary;
-import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteStageTask;
-import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.StagedTaskDescriptor;
-import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.converter.JsonObjectConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
import org.apache.amoro.server.persistence.converter.Map2StringConverter;
-import org.apache.amoro.server.persistence.converter.MapLong2StringConverter;
import org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert;
import
org.apache.amoro.server.persistence.converter.TaskDescriptorTypeConverter;
import
org.apache.amoro.server.persistence.extension.InListExtendedLanguageDriver;
@@ -50,102 +44,36 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-public interface OptimizingMapper {
-
- /** OptimizingProcess operation below */
- @Delete(
- "DELETE FROM table_optimizing_process WHERE table_id = #{tableId} and
process_id < #{expireId}")
- void deleteOptimizingProcessBefore(
- @Param("tableId") long tableId, @Param("expireId") long expireId);
-
+public interface OptimizingProcessMapper {
@Insert(
- "INSERT INTO table_optimizing_process(table_id, catalog_name, db_name,
table_name ,process_id,"
- + " target_snapshot_id, target_change_snapshot_id, status,
optimizing_type, plan_time, summary, from_sequence,"
- + " to_sequence) VALUES (#{table.id}, #{table.catalog},"
- + " #{table.database}, #{table.tableName}, #{processId},
#{targetSnapshotId}, #{targetChangeSnapshotId},"
- + " #{status}, #{optimizingType},"
- + " #{planTime,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
- + " #{summary,
typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter},"
+ "INSERT INTO optimizing_process_state "
+ + "(process_id, table_id, target_snapshot_id,
target_change_snapshot_id, "
+ + "from_sequence, to_sequence) "
+ + "VALUES (#{processId}, #{tableId}, #{targetSnapshotId},
#{targetChangeSnapshotId}, "
+ " #{fromSequence,
typeHandler=org.apache.amoro.server.persistence.converter.MapLong2StringConverter},"
- + " #{toSequence,
typeHandler=org.apache.amoro.server.persistence.converter.MapLong2StringConverter}"
- + ")")
- void insertOptimizingProcess(
- @Param("table") ServerTableIdentifier tableIdentifier,
+ + " #{toSequence,
typeHandler=org.apache.amoro.server.persistence.converter.MapLong2StringConverter})")
+ void insertInternalProcessState(
+ @Param("tableId") long tableId,
@Param("processId") long processId,
@Param("targetSnapshotId") long targetSnapshotId,
@Param("targetChangeSnapshotId") long targetChangeSnapshotId,
- @Param("status") ProcessStatus status,
- @Param("optimizingType") OptimizingType optimizingType,
- @Param("planTime") long planTime,
- @Param("summary") MetricsSummary summary,
@Param("fromSequence") Map<String, Long> fromSequence,
@Param("toSequence") Map<String, Long> toSequence);
- @Update(
- "UPDATE table_optimizing_process SET status = #{optimizingStatus},"
- + " end_time = #{endTime,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
- + "summary = #{summary,
typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter},
"
- + "fail_reason = #{failedReason, jdbcType=VARCHAR}"
- + " WHERE table_id = #{tableId} AND process_id = #{processId}")
- void updateOptimizingProcess(
- @Param("tableId") long tableId,
- @Param("processId") long processId,
- @Param("optimizingStatus") ProcessStatus status,
- @Param("endTime") long endTime,
- @Param("summary") MetricsSummary summary,
- @Param("failedReason") String failedReason);
+ @Delete(
+ "DELETE FROM optimizing_process_state WHERE process_id <= #{processId}
AND table_id = #{tableId}")
+ void deleteProcessStateBefore(@Param("tableId") long tableId,
@Param("processId") long processId);
- @Select(
- "<script>"
- + "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name,
a.table_name, a.target_snapshot_id,"
- + " a.target_change_snapshot_id, a.status, a.optimizing_type,
a.plan_time, a.end_time,"
- + " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM
table_optimizing_process a"
- + " INNER JOIN table_identifier b ON a.table_id = b.table_id"
- + " WHERE a.catalog_name = #{catalogName} AND a.db_name = #{dbName}
AND a.table_name = #{tableName}"
- + " AND b.catalog_name = #{catalogName} AND b.db_name = #{dbName}
AND b.table_name = #{tableName}"
- + " <if test='optimizingType != null'> AND a.optimizing_type =
#{optimizingType}</if>"
- + " <if test='optimizingStatus != null'> AND a.status =
#{optimizingStatus}</if>"
- + " ORDER BY process_id desc"
- + "</script>")
- @Results(
- id = "processMeta",
- value = {
- @Result(property = "processId", column = "process_id"),
- @Result(property = "tableId", column = "table_id"),
- @Result(property = "catalogName", column = "catalog_name"),
- @Result(property = "dbName", column = "db_name"),
- @Result(property = "tableName", column = "table_name"),
- @Result(property = "targetSnapshotId", column = "target_snapshot_id"),
- @Result(property = "targetChangeSnapshotId", column =
"target_change_snapshot_id"),
- @Result(property = "status", column = "status"),
- @Result(property = "optimizingType", column = "optimizing_type"),
- @Result(property = "planTime", column = "plan_time", typeHandler =
Long2TsConverter.class),
- @Result(property = "endTime", column = "end_time", typeHandler =
Long2TsConverter.class),
- @Result(property = "failReason", column = "fail_reason"),
- @Result(property = "summary", column = "summary", typeHandler =
JsonObjectConverter.class),
- @Result(
- property = "fromSequence",
- column = "from_sequence",
- typeHandler = MapLong2StringConverter.class),
- @Result(
- property = "toSequence",
- column = "to_sequence",
- typeHandler = MapLong2StringConverter.class)
- })
- List<OptimizingProcessMeta> selectOptimizingProcesses(
- @Param("catalogName") String catalogName,
- @Param("dbName") String dbName,
- @Param("tableName") String tableName,
- @Param("optimizingType") String optimizingType,
- @Param("optimizingStatus") ProcessStatus optimizingStatus);
+ @Select("SELECT rewrite_input FROM optimizing_process_state WHERE process_id
= #{processId}")
+ @Results({@Result(column = "rewrite_input", jdbcType = JdbcType.BLOB)})
+ List<byte[]> selectProcessInputFiles(@Param("processId") long processId);
- @Select(
- "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name,
a.table_name, a.target_snapshot_id,"
- + " a.target_change_snapshot_id, a.status, a.optimizing_type,
a.plan_time, a.end_time,"
- + " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM
table_optimizing_process a "
- + " WHERE a.process_id = #{processId}")
- @ResultMap("processMeta")
- OptimizingProcessMeta getOptimizingProcess(@Param("processId") long
processId);
+ @Update(
+ "UPDATE optimizing_process_state SET rewrite_input = #{input,
jdbcType=BLOB,"
+ + "
typeHandler=org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert}"
+ + " WHERE process_id = #{processId}")
+ void updateProcessInputFiles(
+ @Param("processId") long processId, @Param("input") Map<Integer,
RewriteFilesInput> input);
/** Optimizing TaskRuntime operation below */
@Insert({
@@ -260,18 +188,6 @@ public interface OptimizingMapper {
@Delete("DELETE FROM task_runtime WHERE table_id = #{tableId} AND process_id
< #{expireId}")
void deleteTaskRuntimesBefore(@Param("tableId") long tableId,
@Param("expireId") long expireId);
- /** Optimizing rewrite input and output operations below */
- @Update(
- "UPDATE table_optimizing_process SET rewrite_input = #{input,
jdbcType=BLOB,"
- + "
typeHandler=org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert}"
- + " WHERE process_id = #{processId}")
- void updateProcessInputFiles(
- @Param("processId") long processId, @Param("input") Map<Integer,
RewriteFilesInput> input);
-
- @Select("SELECT rewrite_input FROM table_optimizing_process WHERE process_id
= #{processId}")
- @Results({@Result(column = "rewrite_input", jdbcType = JdbcType.BLOB)})
- List<byte[]> selectProcessInputFiles(@Param("processId") long processId);
-
/** Optimizing task quota operations below */
@Select(
"SELECT process_id, task_id, retry_num, table_id, start_time, end_time,
fail_reason "
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
index 1ae224672..202dcd491 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
@@ -337,7 +337,7 @@ public interface TableMetaMapper {
+ " last_full_optimizing_time = #{runtime.lastFullOptimizingTime,"
+ "
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
+ " optimizing_status_code = #{runtime.optimizingStatus,"
- +
"typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter},"
+ + "
typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter},"
+ " optimizing_status_start_time = #{runtime.currentStatusStartTime,"
+ "
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
+ " optimizing_process_id = #{runtime.processId},"
@@ -388,10 +388,13 @@ public interface TableMetaMapper {
+ " a.current_change_snapshotId, a.last_optimized_snapshotId,
a.last_optimized_change_snapshotId,"
+ " a.last_major_optimizing_time, a.last_minor_optimizing_time,
a.last_full_optimizing_time,"
+ " a.optimizing_status_code, a.optimizing_status_start_time,
a.optimizing_process_id,"
- + " a.optimizer_group, a.table_config, a.pending_input,
a.table_summary, b.optimizing_type, b.target_snapshot_id,"
- + " b.target_change_snapshot_id, b.plan_time, b.from_sequence,
b.to_sequence FROM table_runtime a"
+ + " a.optimizer_group, a.table_config, a.pending_input,
a.table_summary, "
+ + " p.process_type, s.target_snapshot_id,"
+ + " s.target_change_snapshot_id, p.create_time, s.from_sequence,
s.to_sequence "
+ + " FROM table_runtime a "
+ " INNER JOIN table_identifier i ON a.table_id = i.table_id "
- + " LEFT JOIN table_optimizing_process b ON a.optimizing_process_id
= b.process_id")
+ + " LEFT JOIN table_process p ON a.optimizing_process_id =
p.process_id "
+ + " LEFT JOIN optimizing_process_state s ON a.optimizing_process_id
= s.process_id")
@Results(
id = "tableRuntimeMeta",
value = {
@@ -440,10 +443,13 @@ public interface TableMetaMapper {
property = "tableSummary",
column = "table_summary",
typeHandler = JsonObjectConverter.class),
- @Result(property = "optimizingType", column = "optimizing_type"),
+ @Result(property = "optimizingType", column = "process_type"),
@Result(property = "targetSnapshotId", column = "target_snapshot_id"),
@Result(property = "targetChangeSnapshotId", column =
"target_change_snapshot_id"),
- @Result(property = "planTime", column = "plan_time", typeHandler =
Long2TsConverter.class),
+ @Result(
+ property = "planTime",
+ column = "create_time",
+ typeHandler = Long2TsConverter.class),
@Result(
property = "fromSequence",
column = "from_sequence",
@@ -460,10 +466,13 @@ public interface TableMetaMapper {
+ " a.current_change_snapshotId, a.last_optimized_snapshotId,
a.last_optimized_change_snapshotId,"
+ " a.last_major_optimizing_time, a.last_minor_optimizing_time,
a.last_full_optimizing_time,"
+ " a.optimizing_status_code, a.optimizing_status_start_time,
a.optimizing_process_id,"
- + " a.optimizer_group, a.table_config, a.pending_input,
a.table_summary, b.optimizing_type, b.target_snapshot_id,"
- + " b.target_change_snapshot_id, b.plan_time, b.from_sequence,
b.to_sequence FROM table_runtime a"
+ + " a.optimizer_group, a.table_config, a.pending_input,
a.table_summary, "
+ + " p.process_type, s.target_snapshot_id,"
+ + " s.target_change_snapshot_id, p.create_time, s.from_sequence,
s.to_sequence "
+ + " FROM table_runtime a "
+ " INNER JOIN table_identifier i ON a.table_id = i.table_id "
- + " LEFT JOIN table_optimizing_process b ON a.optimizing_process_id
= b.process_id "
+ + " LEFT JOIN table_process p ON a.optimizing_process_id =
p.process_id "
+ + " LEFT JOIN optimizing_process_state s ON a.optimizing_process_id
= s.process_id"
+ " WHERE a.table_id = #{tableId}")
@ResultMap("tableRuntimeMeta")
TableRuntimeMeta getTableRuntimeMeta(@Param("tableId") long tableId);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java
new file mode 100644
index 000000000..0e52170d9
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.mapper;
+
+import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.server.persistence.converter.Long2TsConverter;
+import org.apache.amoro.server.persistence.converter.Map2StringConverter;
+import org.apache.amoro.server.process.TableProcessMeta;
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.List;
+import java.util.Map;
+
+public interface TableProcessMapper {
+
+ @Delete("DELETE FROM table_process WHERE process_id <= #{processId} AND
table_id = #{tableId}")
+ void deleteBefore(@Param("tableId") long tableId, @Param("processId") long
processId);
+
+ @Insert(
+ "INSERT INTO table_process "
+ + "(process_id, table_id, status, process_type, process_stage,
execution_engine, "
+ + "create_time, summary) "
+ + "VALUES (#{processId}, #{tableId}, #{status}, #{processType},
#{processStage}, "
+ + "#{executionEngine}, "
+ + "#{createTime,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
+ + "#{summary,
typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter})")
+ void insertProcess(
+ @Param("tableId") long tableId,
+ @Param("processId") long processId,
+ @Param("status") ProcessStatus status,
+ @Param("processType") String processType,
+ @Param("processStage") String processStage,
+ @Param("executionEngine") String executionEngine,
+ @Param("createTime") long createTime,
+ @Param("summary") Map<String, String> summary);
+
+ @Update(
+ "UPDATE table_process SET status = #{status}, "
+ + "process_stage = #{processStage}, "
+ + "finish_time = #{finishTime,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
+ + "fail_message = #{failMessage, jdbcType=VARCHAR}, "
+ + "summary = #{summary,
typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter} "
+ + "WHERE process_id = #{processId} AND table_id = #{tableId}")
+ void updateProcess(
+ @Param("tableId") long tableId,
+ @Param("processId") long processId,
+ @Param("status") ProcessStatus status,
+ @Param("processStage") String processStage,
+ @Param("finishTime") long finishTime,
+ @Param("failMessage") String failMessage,
+ @Param("summary") Map<String, String> summary);
+
+ @Select(
+ "SELECT process_id, table_id, status, process_type, process_stage,
execution_engine, "
+ + "create_time, finish_time, fail_message, summary "
+ + "FROM table_process WHERE process_id = #{processId}")
+ @Results(
+ id = "tableProcessMap",
+ value = {
+ @Result(column = "process_id", property = "processId"),
+ @Result(column = "table_id", property = "tableId"),
+ @Result(column = "status", property = "status"),
+ @Result(column = "process_type", property = "processType"),
+ @Result(column = "process_stage", property = "processStage"),
+ @Result(column = "execution_engine", property = "executionEngine"),
+ @Result(
+ column = "create_time",
+ property = "createTime",
+ typeHandler = Long2TsConverter.class),
+ @Result(
+ column = "finish_time",
+ property = "finishTime",
+ typeHandler = Long2TsConverter.class),
+ @Result(column = "fail_message", property = "failMessage"),
+ @Result(column = "summary", property = "summary", typeHandler =
Map2StringConverter.class)
+ })
+ TableProcessMeta getProcessMeta(@Param("processId") long processId);
+
+ @Select(
+ "<script>"
+ + "SELECT process_id, table_id, status, process_type, process_stage,
execution_engine, "
+ + "create_time, finish_time, fail_message, summary "
+ + "FROM table_process WHERE table_id = #{tableId} "
+ + " <if test='processType != null'> AND process_type =
#{processType}</if>"
+ + " <if test='status != null'> AND status = #{status}</if>"
+ + " ORDER BY process_id desc"
+ + "</script>")
+ @ResultMap("tableProcessMap")
+ List<TableProcessMeta> listProcessMeta(
+ @Param("tableId") long tableId,
+ @Param("processType") String processType,
+ @Param("status") ProcessStatus optimizingStatus);
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
new file mode 100644
index 000000000..6e822b0f6
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process;
+
+import org.apache.amoro.process.ProcessStatus;
+
+import java.util.Map;
+
+public class TableProcessMeta {
+ private long processId;
+ private long tableId;
+ private ProcessStatus status;
+ private String processType;
+ private String processStage;
+ private String executionEngine;
+ private long createTime;
+ private long finishTime;
+ private String failMessage;
+ private Map<String, String> summary;
+
+ public long getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(long processId) {
+ this.processId = processId;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public void setTableId(long tableId) {
+ this.tableId = tableId;
+ }
+
+ public ProcessStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(ProcessStatus status) {
+ this.status = status;
+ }
+
+ public String getProcessType() {
+ return processType;
+ }
+
+ public void setProcessType(String processType) {
+ this.processType = processType;
+ }
+
+ public String getProcessStage() {
+ return processStage;
+ }
+
+ public void setProcessStage(String processStage) {
+ this.processStage = processStage;
+ }
+
+ public String getExecutionEngine() {
+ return executionEngine;
+ }
+
+ public void setExecutionEngine(String executionEngine) {
+ this.executionEngine = executionEngine;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(long createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getFailMessage() {
+ return failMessage;
+ }
+
+ public void setFailMessage(String failMessage) {
+ this.failMessage = failMessage;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public Map<String, String> getSummary() {
+ return summary;
+ }
+
+ public void setSummary(Map<String, String> summary) {
+ this.summary = summary;
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
index 77f610b7c..329afc8ef 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
@@ -20,7 +20,8 @@ package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
@@ -72,19 +73,24 @@ public class OptimizingExpiringExecutor extends
PeriodicTableScheduler {
doAsTransaction(
() ->
doAs(
- OptimizingMapper.class,
+ TableProcessMapper.class,
mapper ->
- mapper.deleteOptimizingProcessBefore(
+
mapper.deleteBefore(tableRuntime.getTableIdentifier().getId(), minProcessId)),
+ () ->
+ doAs(
+ OptimizingProcessMapper.class,
+ mapper ->
+ mapper.deleteProcessStateBefore(
tableRuntime.getTableIdentifier().getId(),
minProcessId)),
() ->
doAs(
- OptimizingMapper.class,
+ OptimizingProcessMapper.class,
mapper ->
mapper.deleteTaskRuntimesBefore(
tableRuntime.getTableIdentifier().getId(),
minProcessId)),
() ->
doAs(
- OptimizingMapper.class,
+ OptimizingProcessMapper.class,
mapper ->
mapper.deleteOptimizingQuotaBefore(
tableRuntime.getTableIdentifier().getId(),
minProcessId)));
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
index 68204e5eb..c34e82044 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
@@ -39,7 +39,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
@@ -332,7 +332,7 @@ public class DefaultOptimizingState extends
StatedPersistentBase implements Proc
taskQuotas.clear();
taskQuotas.addAll(
getAs(
- OptimizingMapper.class,
+ OptimizingProcessMapper.class,
mapper -> mapper.selectTaskQuotasByTime(tableIdentifier.getId(),
minProcessId)));
} finally {
tableLock.unlock();
@@ -446,7 +446,7 @@ public class DefaultOptimizingState extends
StatedPersistentBase implements Proc
}
public void addTaskQuota(TaskRuntime.TaskQuota taskQuota) {
- doAsIgnoreError(OptimizingMapper.class, mapper ->
mapper.insertTaskQuota(taskQuota));
+ doAsIgnoreError(OptimizingProcessMapper.class, mapper ->
mapper.insertTaskQuota(taskQuota));
taskQuotas.add(taskQuota);
long validTime = System.currentTimeMillis() -
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
this.taskQuotas.removeIf(task -> task.checkExpired(validTime));
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
index 826d59b7b..b3d9a03d5 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
@@ -42,7 +42,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
@@ -271,7 +271,7 @@ public class DefaultTableManager extends PersistentBase
implements TableManager
List<OptimizingTaskMeta> taskMetas =
getAs(
- OptimizingMapper.class,
+ OptimizingProcessMapper.class,
m -> {
if (processIds.isEmpty()) {
return Lists.newArrayList();
@@ -318,7 +318,9 @@ public class DefaultTableManager extends PersistentBase
implements TableManager
long calculatingStartTime = calculatingEndTime -
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
long minProcessId =
SnowflakeIdGenerator.getMinSnowflakeId(calculatingStartTime);
List<TaskRuntime.TaskQuota> quotas =
- getAs(OptimizingMapper.class, mapper ->
mapper.selectTableQuotas(tableIds, minProcessId));
+ getAs(
+ OptimizingProcessMapper.class,
+ mapper -> mapper.selectTableQuotas(tableIds, minProcessId));
return quotas.stream()
.collect(Collectors.groupingBy(TaskRuntime.TaskQuota::getTableId,
Collectors.toList()));
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index 0e4e507e3..d04589f10 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -123,25 +123,32 @@ CREATE TABLE table_runtime (
CONSTRAINT table_runtime_table_name_idx UNIQUE (catalog_name, db_name,
table_name)
);
-CREATE TABLE table_optimizing_process (
- process_id BIGINT NOT NULL,
- table_id BIGINT NOT NULL,
- catalog_name VARCHAR(64) NOT NULL,
- db_name VARCHAR(128) NOT NULL,
- table_name VARCHAR(256) NOT NULL,
- target_snapshot_id BIGINT NOT NULL,
- target_change_snapshot_id BIGINT NOT NULL,
- status VARCHAR(10) NOT NULL,
- optimizing_type VARCHAR(10) NOT NULL,
- plan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- end_time TIMESTAMP DEFAULT NULL,
- fail_reason VARCHAR(4096),
- rewrite_input BLOB(64m),
- summary CLOB(64m),
- from_sequence CLOB(64m),
- to_sequence CLOB(64m),
- CONSTRAINT table_optimizing_process_pk PRIMARY KEY (process_id)
+CREATE TABLE table_process (
+ process_id BIGINT NOT NULL PRIMARY KEY,
+ table_id BIGINT NOT NULL,
+ status VARCHAR(64) NOT NULL,
+ process_type VARCHAR(64) NOT NULL,
+ process_stage VARCHAR(64) NOT NULL,
+ execution_engine VARCHAR(64) NOT NULL,
+ create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ finish_time TIMESTAMP DEFAULT NULL,
+ fail_message CLOB,
+ summary CLOB(64m)
+);
+CREATE INDEX table_process_table_idx ON table_process (table_id, create_time);
+
+CREATE TABLE optimizing_process_state (
+ process_id BIGINT NOT NULL PRIMARY KEY,
+ table_id BIGINT NOT NULL,
+ target_snapshot_id BIGINT NOT NULL,
+ target_change_snapshot_id BIGINT NOT NULL,
+ rewrite_input BLOB(64m),
+ from_sequence CLOB(64m),
+ to_sequence CLOB(64m)
);
+CREATE INDEX optimizing_process_state_table_idx
+ ON optimizing_process_state (table_id);
+
CREATE TABLE task_runtime (
process_id BIGINT NOT NULL,
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index ee815fcc6..f1b6c3743 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -135,26 +135,33 @@ CREATE TABLE `table_runtime`
INDEX idx_optimizer_status_and_time (optimizing_status_code,
optimizing_status_start_time DESC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Optimize running information
of each table' ROW_FORMAT=DYNAMIC;
-CREATE TABLE `table_optimizing_process`
+CREATE TABLE `table_process`
+(
+ `process_id` bigint(20) NOT NULL COMMENT 'table process
id',
+ `table_id` bigint(20) NOT NULL COMMENT 'table id',
+ `status` varchar(64) NOT NULL COMMENT 'Table
optimizing status',
+ `process_type` varchar(64) NOT NULL COMMENT 'Process
action type',
+ `process_stage` varchar(64) NOT NULL COMMENT 'Process
current stage',
+ `execution_engine` varchar(64) NOT NULL COMMENT 'Execution
engine',
+ `create_time` timestamp DEFAULT CURRENT_TIMESTAMP
COMMENT 'First plan time',
+ `finish_time` timestamp NULL DEFAULT NULL COMMENT
'finish time or failed time',
+ `fail_message` mediumtext DEFAULT NULL COMMENT 'Error
message after task failed',
+ `summary` mediumtext COMMENT 'Max change transaction
id of these tasks',
+ PRIMARY KEY (`process_id`),
+ KEY `table_index` (`table_id`, `create_time`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after
each commit';
+
+CREATE TABLE `optimizing_process_state`
(
`process_id` bigint(20) NOT NULL COMMENT
'optimizing_procedure UUID',
`table_id` bigint(20) NOT NULL,
- `catalog_name` varchar(64) NOT NULL COMMENT 'Catalog
name',
- `db_name` varchar(128) NOT NULL COMMENT 'Database
name',
- `table_name` varchar(256) NOT NULL COMMENT 'Table name',
`target_snapshot_id` bigint(20) NOT NULL,
`target_change_snapshot_id` bigint(20) NOT NULL,
- `status` varchar(10) NOT NULL COMMENT 'Direct to
TableOptimizingStatus',
- `optimizing_type` varchar(10) NOT NULL COMMENT 'Optimize
type: Major, Minor',
- `plan_time` timestamp DEFAULT CURRENT_TIMESTAMP
COMMENT 'First plan time',
- `end_time` timestamp NULL DEFAULT NULL COMMENT
'finish time or failed time',
- `fail_reason` varchar(4096) DEFAULT NULL COMMENT 'Error
message after task failed',
`rewrite_input` longblob DEFAULT NULL COMMENT 'rewrite
files input',
- `summary` mediumtext COMMENT 'Max change transaction
id of these tasks',
`from_sequence` mediumtext COMMENT 'from or min sequence
of each partition',
`to_sequence` mediumtext COMMENT 'to or max sequence of
each partition',
PRIMARY KEY (`process_id`),
- KEY `table_index` (`table_id`, `plan_time`)
+ KEY `table_index` (`table_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after
each commit';
CREATE TABLE `task_runtime`
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index 71bc576c5..ebbfc6010 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -23,4 +23,56 @@ ALTER TABLE `table_runtime` MODIFY COLUMN
`optimizing_status_start_time` TIMESTA
UPDATE `table_optimizing_process` SET `process_id` = `process_id` /10 << 13;
UPDATE `task_runtime` SET `process_id` = `process_id` /10 << 13;
UPDATE `optimizing_task_quota` SET `process_id` = `process_id` /10 << 13;
-UPDATE `table_runtime` SET `optimizing_process_id` = `optimizing_process_id`
/10 << 13;
\ No newline at end of file
+UPDATE `table_runtime` SET `optimizing_process_id` = `optimizing_process_id`
/10 << 13;
+
+CREATE TABLE `table_process`
+(
+ `process_id` bigint(20) NOT NULL COMMENT 'table process
id',
+ `table_id` bigint(20) NOT NULL COMMENT 'table id',
+ `status` varchar(64) NOT NULL COMMENT 'Table
optimizing status',
+ `process_type` varchar(64) NOT NULL COMMENT 'Process
action type',
+ `process_stage` varchar(64) NOT NULL COMMENT 'Process
current stage',
+ `execution_engine` varchar(64) NOT NULL COMMENT 'Execution
engine',
+ `create_time` timestamp DEFAULT CURRENT_TIMESTAMP
COMMENT 'First plan time',
+ `finish_time` timestamp NULL DEFAULT NULL COMMENT
'finish time or failed time',
+ `fail_message` mediumtext DEFAULT NULL COMMENT 'Error
message after task failed',
+ `summary` mediumtext COMMENT 'Max change transaction
id of these tasks',
+ PRIMARY KEY (`process_id`),
+ KEY `table_index` (`table_id`, `create_time`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after
each commit';
+
+CREATE TABLE `optimizing_process_state`
+(
+ `process_id` bigint(20) NOT NULL COMMENT
'optimizing_procedure UUID',
+ `table_id` bigint(20) NOT NULL,
+ `target_snapshot_id` bigint(20) NOT NULL,
+ `target_change_snapshot_id` bigint(20) NOT NULL,
+ `rewrite_input` longblob DEFAULT NULL COMMENT 'rewrite
files input',
+ `from_sequence` mediumtext COMMENT 'from or min sequence
of each partition',
+ `to_sequence` mediumtext COMMENT 'to or max sequence of
each partition',
+ PRIMARY KEY (`process_id`)
+ KEY `table_index` (`table_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after
each commit';
+
+INSERT INTO `table_process`
+(`process_id`, `table_id`, `status`, `process_type`,
+`process_stage`, `execution_engine`, `create_time`, `finish_time`,
`fail_message`, `summary`)
+SELECT p.`process_id`, p.`table_id`, p.`status`, p.`optimizing_type`,
+CASE
+ WHEN t.`optimizing_status_code` = 700 THEN 'IDLE'
+ WHEN t.`optimizing_status_code` = 600 THEN 'PENDING'
+ WHEN t.`optimizing_status_code` = 500 THEN 'PLANNING'
+ WHEN t.`optimizing_status_code` = 400 THEN 'COMMITTING'
+ WHEN t.`optimizing_status_code` = 300 THEN 'MINOR_OPTIMIZING'
+ WHEN t.`optimizing_status_code` = 200 THEN 'MAJOR_OPTIMIZING'
+ WHEN t.`optimizing_status_code` = 100 THEN 'FULL_OPTIMIZING'
+END,
+ 'AMORO', p.`plan_time`, p.`end_time`, p.`fail_reason`, p.`summary`
+FROM `table_optimizing_process` p JOIN `table_runtime` t ON p.table_id =
t.table_id;
+
+INSERT INTO `optimizing_process_state`
+(`process_id`, `table_id`, `target_snapshot_id`, `target_change_snapshot_id`,
`rewrite_input`, `from_sequence`, `to_sequence`)
+SELECT `process_id`, `table_id`, `target_snapshot_id`,
`target_change_snapshot_id`, `rewrite_input`, `from_sequence`, `to_sequence`
+FROM `table_optimizing_process`;
+
+DROP TABLE IF EXISTS `table_optimizing_process`;
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index 6f31d469b..ae0382530 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -215,45 +215,54 @@ COMMENT ON COLUMN table_runtime.pending_input IS 'Pending
input data';
COMMENT ON COLUMN table_runtime.table_summary IS 'Table summary data';
CREATE INDEX idx_optimizer_status_and_time ON
table_runtime(optimizing_status_code, optimizing_status_start_time DESC);
-CREATE TABLE table_optimizing_process
-(
- process_id BIGINT NOT NULL,
- table_id BIGINT NOT NULL,
- catalog_name VARCHAR(64) NOT NULL,
- db_name VARCHAR(128) NOT NULL,
- table_name VARCHAR(256) NOT NULL,
- target_snapshot_id BIGINT NOT NULL,
- target_change_snapshot_id BIGINT NOT NULL,
- status VARCHAR(10) NOT NULL,
- optimizing_type VARCHAR(10) NOT NULL,
- plan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- end_time TIMESTAMP,
- fail_reason VARCHAR(4096),
- rewrite_input BYTEA,
- summary TEXT,
- from_sequence TEXT,
- to_sequence TEXT,
- PRIMARY KEY (process_id)
+CREATE TABLE table_process (
+ process_id bigserial PRIMARY KEY,
+ table_id bigint NOT NULL,
+ status varchar(64) NOT NULL,
+ process_type varchar(64) NOT NULL,
+ process_stage varchar(64) NOT NULL,
+ execution_engine varchar(64) NOT NULL,
+ create_time timestamptz NOT NULL DEFAULT now(),
+ finish_time timestamptz,
+ fail_message text CHECK (length(fail_message) <= 4096),
+ summary text,
+ CONSTRAINT table_process_unique UNIQUE (process_id)
+);
+
+CREATE INDEX table_process_table_idx ON table_process (table_id, create_time);
+
+COMMENT ON TABLE table_process IS 'History of optimizing after each commit';
+COMMENT ON COLUMN table_process.process_id IS 'table process id';
+COMMENT ON COLUMN table_process.table_id IS 'table id';
+COMMENT ON COLUMN table_process.status IS 'Table optimizing status';
+COMMENT ON COLUMN table_process.process_type IS 'Process action type';
+COMMENT ON COLUMN table_process.process_stage IS 'Process current stage';
+COMMENT ON COLUMN table_process.execution_engine IS 'Execution engine';
+COMMENT ON COLUMN table_process.create_time IS 'First plan time';
+COMMENT ON COLUMN table_process.finish_time IS 'finish time or failed
time';
+COMMENT ON COLUMN table_process.fail_message IS 'Error message after task
failed';
+COMMENT ON COLUMN table_process.summary IS 'Max change transaction id
of these tasks';
+
+CREATE TABLE optimizing_process_state (
+ process_id bigint PRIMARY KEY,
+ table_id bigint NOT NULL,
+ target_snapshot_id bigint NOT NULL,
+ target_change_snapshot_id bigint NOT NULL,
+ rewrite_input bytea,
+ from_sequence text,
+ to_sequence text
);
-CREATE INDEX process_index ON table_optimizing_process (table_id, plan_time);
-COMMENT ON TABLE table_optimizing_process IS 'History of optimizing after each
commit';
-COMMENT ON COLUMN table_optimizing_process.process_id IS 'Optimizing procedure
UUID';
-COMMENT ON COLUMN table_optimizing_process.table_id IS 'Table ID';
-COMMENT ON COLUMN table_optimizing_process.catalog_name IS 'Catalog name';
-COMMENT ON COLUMN table_optimizing_process.db_name IS 'Database name';
-COMMENT ON COLUMN table_optimizing_process.table_name IS 'Table name';
-COMMENT ON COLUMN table_optimizing_process.target_snapshot_id IS 'Target
snapshot ID';
-COMMENT ON COLUMN table_optimizing_process.target_change_snapshot_id IS
'Target change snapshot ID';
-COMMENT ON COLUMN table_optimizing_process.status IS 'Optimizing status';
-COMMENT ON COLUMN table_optimizing_process.optimizing_type IS 'Optimizing
type: Major, Minor';
-COMMENT ON COLUMN table_optimizing_process.plan_time IS 'Plan time';
-COMMENT ON COLUMN table_optimizing_process.end_time IS 'Finish time or failed
time';
-COMMENT ON COLUMN table_optimizing_process.fail_reason IS 'Error message after
task failure';
-COMMENT ON COLUMN table_optimizing_process.rewrite_input IS 'Rewrite input
files';
-COMMENT ON COLUMN table_optimizing_process.summary IS 'Summary of optimizing
tasks';
-COMMENT ON COLUMN table_optimizing_process.from_sequence IS 'From or min
sequence of each partition';
-COMMENT ON COLUMN table_optimizing_process.to_sequence IS 'To or max sequence
of each partition';
+CREATE INDEX optimizing_process_state_table_idx ON optimizing_process_state
(table_id);
+
+COMMENT ON TABLE optimizing_process_state IS 'History of optimizing after
each commit';
+COMMENT ON COLUMN optimizing_process_state.process_id IS
'optimizing_procedure UUID';
+COMMENT ON COLUMN optimizing_process_state.table_id IS 'table
id';
+COMMENT ON COLUMN optimizing_process_state.target_snapshot_id IS
'target snapshot id';
+COMMENT ON COLUMN optimizing_process_state.target_change_snapshot_id IS
'target change snapshot id';
+COMMENT ON COLUMN optimizing_process_state.rewrite_input IS
'rewrite files input';
+COMMENT ON COLUMN optimizing_process_state.from_sequence IS 'from
or min sequence of each partition';
+COMMENT ON COLUMN optimizing_process_state.to_sequence IS 'to or
max sequence of each partition';
CREATE TABLE task_runtime
(
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index ec89d292b..eba85a18d 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -26,3 +26,94 @@ UPDATE table_optimizing_process SET process_id = process_id
/10 << 13;
UPDATE task_runtime SET process_id = process_id /10 << 13;
UPDATE optimizing_task_quota SET process_id = process_id /10 << 13;
UPDATE table_runtime SET optimizing_process_id = optimizing_process_id /10 <<
13;
+
+CREATE TABLE table_process (
+ process_id bigserial PRIMARY KEY,
+ table_id bigint NOT NULL,
+ status varchar(64) NOT NULL,
+ process_type varchar(64) NOT NULL,
+ process_stage varchar(64) NOT NULL,
+ execution_engine varchar(64) NOT NULL,
+ create_time timestamptz NOT NULL DEFAULT now(),
+ finish_time timestamptz,
+ fail_message text CHECK (length(fail_message) <= 4096),
+ summary text,
+ CONSTRAINT table_process_unique UNIQUE (process_id)
+);
+
+CREATE INDEX table_process_table_idx ON table_process (table_id, create_time);
+
+COMMENT ON TABLE table_process IS 'History of optimizing after each commit';
+COMMENT ON COLUMN table_process.process_id IS 'table process id';
+COMMENT ON COLUMN table_process.table_id IS 'table id';
+COMMENT ON COLUMN table_process.status IS 'Table optimizing status';
+COMMENT ON COLUMN table_process.process_type IS 'Process action type';
+COMMENT ON COLUMN table_process.process_stage IS 'Process current stage';
+COMMENT ON COLUMN table_process.execution_engine IS 'Execution engine';
+COMMENT ON COLUMN table_process.create_time IS 'First plan time';
+COMMENT ON COLUMN table_process.finish_time IS 'finish time or failed
time';
+COMMENT ON COLUMN table_process.fail_message IS 'Error message after task
failed';
+COMMENT ON COLUMN table_process.summary IS 'Max change transaction id
of these tasks';
+
+CREATE TABLE optimizing_process_state (
+ process_id bigint PRIMARY KEY,
+ table_id bigint NOT NULL,
+ target_snapshot_id bigint NOT NULL,
+ target_change_snapshot_id bigint NOT NULL,
+ rewrite_input bytea,
+ from_sequence text,
+ to_sequence text
+);
+
+CREATE INDEX optimizing_process_state_table_idx ON optimizing_process_state
(table_id);
+
+COMMENT ON TABLE optimizing_process_state IS 'History of optimizing after
each commit';
+COMMENT ON COLUMN optimizing_process_state.process_id IS
'optimizing_procedure UUID';
+COMMENT ON COLUMN optimizing_process_state.table_id IS 'table
id';
+COMMENT ON COLUMN optimizing_process_state.target_snapshot_id IS
'target snapshot id';
+COMMENT ON COLUMN optimizing_process_state.target_change_snapshot_id IS
'target change snapshot id';
+COMMENT ON COLUMN optimizing_process_state.rewrite_input IS
'rewrite files input';
+COMMENT ON COLUMN optimizing_process_state.from_sequence IS 'from
or min sequence of each partition';
+COMMENT ON COLUMN optimizing_process_state.to_sequence IS 'to or
max sequence of each partition';
+
+INSERT INTO table_process
+(process_id, table_id, status, process_type,
+ process_stage, execution_engine, create_time, finish_time, fail_message,
summary)
+SELECT
+ p.process_id,
+ p.table_id,
+ p.status,
+ p.optimizing_type,
+ CASE t.optimizing_status_code
+ WHEN 700 THEN 'IDLE'
+ WHEN 600 THEN 'PENDING'
+ WHEN 500 THEN 'PLANNING'
+ WHEN 400 THEN 'COMMITTING'
+ WHEN 300 THEN 'MINOR_OPTIMIZING'
+ WHEN 200 THEN 'MAJOR_OPTIMIZING'
+ WHEN 100 THEN 'FULL_OPTIMIZING'
+ END,
+ 'AMORO',
+ p.plan_time,
+ p.end_time,
+ p.fail_reason,
+ p.summary
+FROM table_optimizing_process AS p
+JOIN table_runtime AS t
+ ON p.table_id = t.table_id;
+
+INSERT INTO optimizing_process_state
+(process_id, table_id,
+ target_snapshot_id, target_change_snapshot_id,
+ rewrite_input, from_sequence, to_sequence)
+SELECT
+ process_id,
+ table_id,
+ target_snapshot_id,
+ target_change_snapshot_id,
+ rewrite_input,
+ from_sequence,
+ to_sequence
+FROM table_optimizing_process;
+
+DROP TABLE IF EXISTS table_optimizing_process;
\ No newline at end of file
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
index 6a88748ad..b8ecf8467 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
@@ -31,8 +31,9 @@ import
org.apache.amoro.hive.formats.IcebergHiveCatalogTestHelper;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.table.DerbyPersistence;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.descriptor.FormatTableDescriptor;
@@ -306,17 +307,25 @@ public class TestIcebergServerTableDescriptor extends
TestServerTableDescriptor
Map<String, Long> fromSequence,
Map<String, Long> toSequence) {
doAs(
- OptimizingMapper.class,
+ TableProcessMapper.class,
mapper ->
- mapper.insertOptimizingProcess(
- identifier,
+ mapper.insertProcess(
+ identifier.getId(),
processId,
- targetSnapshotId,
- targetChangeSnapshotId,
status,
- type,
+ type.name(),
+ type.name(),
+ "AMORO",
planTime,
- summary,
+ summary.summaryAsMap(false)));
+ doAs(
+ OptimizingProcessMapper.class,
+ mapper ->
+ mapper.insertInternalProcessState(
+ identifier.getId(),
+ processId,
+ targetSnapshotId,
+ targetChangeSnapshotId,
fromSequence,
toSequence));
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
index a38842bf4..06ae381a5 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
@@ -18,10 +18,14 @@
package org.apache.amoro.server.optimizing;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.TableIdentifier;
import org.apache.iceberg.data.Record;
@@ -87,45 +91,45 @@ public class BaseOptimizingChecker extends PersistentBase {
}
protected void assertOptimizingProcess(
- OptimizingProcessMeta optimizingProcess,
+ TableProcessMeta optimizingProcess,
OptimizingType optimizeType,
int fileCntBefore,
int fileCntAfter) {
Assert.assertNotNull(optimizingProcess);
- Assert.assertEquals(optimizeType, optimizingProcess.getOptimizingType());
+ Assert.assertEquals(optimizeType.name(),
optimizingProcess.getProcessType());
+ MetricsSummary summary =
MetricsSummary.fromMap(optimizingProcess.getSummary());
Assert.assertEquals(
fileCntBefore,
- optimizingProcess.getSummary().getRewritePosDataFileCnt()
- + optimizingProcess.getSummary().getRewriteDataFileCnt()
- + optimizingProcess.getSummary().getEqDeleteFileCnt()
- + optimizingProcess.getSummary().getPosDeleteFileCnt());
- Assert.assertEquals(
- fileCntAfter,
- optimizingProcess.getSummary().getNewDataFileCnt()
- + optimizingProcess.getSummary().getNewDeleteFileCnt());
+ summary.getRewritePosDataFileCnt()
+ + summary.getRewriteDataFileCnt()
+ + summary.getEqDeleteFileCnt()
+ + summary.getPosDeleteFileCnt());
+ Assert.assertEquals(fileCntAfter, summary.getNewDataFileCnt() +
summary.getNewDeleteFileCnt());
}
- protected OptimizingProcessMeta waitOptimizeResult() {
+ protected TableProcessMeta waitOptimizeResult() {
boolean success;
+ ServerTableIdentifier identifier =
+ getAs(
+ TableMetaMapper.class,
+ m ->
+ m.selectTableIdentifier(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName()));
try {
success =
waitUntilFinish(
() -> {
- List<OptimizingProcessMeta> tableOptimizingProcesses =
+ List<TableProcessMeta> tableOptimizingProcesses =
getAs(
- OptimizingMapper.class,
- mapper ->
- mapper.selectOptimizingProcesses(
- tableIdentifier.getCatalog(),
- tableIdentifier.getDatabase(),
- tableIdentifier.getTableName(),
- null,
- null));
+ TableProcessMapper.class,
+ mapper -> mapper.listProcessMeta(identifier.getId(),
null, null));
if (tableOptimizingProcesses == null ||
tableOptimizingProcesses.isEmpty()) {
LOG.info("optimize history is empty");
return Status.RUNNING;
}
- Optional<OptimizingProcessMeta> any =
+ Optional<TableProcessMeta> any =
tableOptimizingProcesses.stream()
.filter(p -> p.getProcessId() > lastProcessId)
.filter(p ->
p.getStatus().equals(ProcessStatus.SUCCESS))
@@ -137,7 +141,7 @@ public class BaseOptimizingChecker extends PersistentBase {
LOG.info(
"optimize max process id {}",
tableOptimizingProcesses.stream()
- .map(OptimizingProcessMeta::getProcessId)
+ .map(TableProcessMeta::getProcessId)
.max(Comparator.naturalOrder())
.get());
return Status.RUNNING;
@@ -150,16 +154,10 @@ public class BaseOptimizingChecker extends PersistentBase
{
}
if (success) {
- List<OptimizingProcessMeta> result =
+ List<TableProcessMeta> result =
getAs(
- OptimizingMapper.class,
- mapper ->
- mapper.selectOptimizingProcesses(
- tableIdentifier.getCatalog(),
- tableIdentifier.getDatabase(),
- tableIdentifier.getTableName(),
- null,
- null))
+ TableProcessMapper.class,
+ mapper -> mapper.listProcessMeta(identifier.getId(), null,
null))
.stream()
.filter(p -> p.getProcessId() > lastProcessId)
.filter(p -> p.getStatus().equals(ProcessStatus.SUCCESS))
@@ -181,16 +179,18 @@ public class BaseOptimizingChecker extends PersistentBase
{
} catch (InterruptedException e) {
throw new IllegalStateException("waiting result was interrupted");
}
- List<OptimizingProcessMeta> tableOptimizingProcesses =
+ ServerTableIdentifier identifier =
+ getAs(
+ TableMetaMapper.class,
+ m ->
+ m.selectTableIdentifier(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName()));
+ List<TableProcessMeta> tableOptimizingProcesses =
getAs(
- OptimizingMapper.class,
- mapper ->
- mapper.selectOptimizingProcesses(
- tableIdentifier.getCatalog(),
- tableIdentifier.getDatabase(),
- tableIdentifier.getTableName(),
- null,
- null))
+ TableProcessMapper.class,
+ mapper -> mapper.listProcessMeta(identifier.getId(), null,
null))
.stream()
.filter(p -> p.getProcessId() > lastProcessId)
.collect(Collectors.toList());
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
index 29ab60d53..3b2bc3b68 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
@@ -24,6 +24,7 @@ import org.apache.amoro.server.AmsEnvironment;
import org.apache.amoro.server.RestCatalogService;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
+import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -103,7 +104,7 @@ public class TestIcebergHadoopOptimizing extends
AbstractOptimizingTest {
partitionData);
// wait Minor Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2,
1);
checker.assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
@@ -179,7 +180,7 @@ public class TestIcebergHadoopOptimizing extends
AbstractOptimizingTest {
partitionData);
// wait Minor Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2,
1);
assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
@@ -221,7 +222,7 @@ public class TestIcebergHadoopOptimizing extends
AbstractOptimizingTest {
partitionData);
// wait Minor Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2,
1);
assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
@@ -334,7 +335,7 @@ public class TestIcebergHadoopOptimizing extends
AbstractOptimizingTest {
updateProperties(table,
TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, "0");
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 9,
1);
assertIds(readRecords(table), 4, 5);
@@ -368,7 +369,7 @@ public class TestIcebergHadoopOptimizing extends
AbstractOptimizingTest {
updateProperties(table, TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT, "4");
// wait Minor Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6,
3);
assertIds(readRecords(table), 1, 2, 3, 4, 5, 6);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
index 8f6dcd662..5555c4802 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedHiveOptimizing.java
@@ -22,6 +22,7 @@ import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.io.AuthenticatedHadoopFileIO;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.OptimizingType;
+import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Streams;
import org.apache.amoro.table.KeyedTable;
@@ -58,7 +59,7 @@ public class TestMixedHiveOptimizing extends
AbstractOptimizingTest {
updateProperties(table, TableProperties.BASE_FILE_INDEX_HASH_BUCKET, 1 +
"");
writeBase(table, rangeFromTo(1, 100, "aaa", quickDateWithZone(3)));
// wait Full Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL, 1,
1);
assertIdRange(readRecords(table), 1, 100);
// assert file are in hive location
@@ -98,7 +99,7 @@ public class TestMixedHiveOptimizing extends
AbstractOptimizingTest {
updateProperties(table,
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES, false + "");
writeBase(table, rangeFromTo(1, 100, "aaa", quickDateWithZone(3)));
// wait Full Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL, 1,
1);
assertIdRange(readRecords(table), 1, 100);
// assert file are in hive location
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
index 458b5444f..2b3f8e612 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestMixedIcebergOptimizing.java
@@ -19,6 +19,7 @@
package org.apache.amoro.server.optimizing;
import org.apache.amoro.optimizing.OptimizingType;
+import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.KeyedTable;
import org.apache.amoro.table.MixedTable;
@@ -58,7 +59,7 @@ public class TestMixedIcebergOptimizing extends
AbstractOptimizingTest {
null);
// wait Minor Optimize result, no major optimize because there is only 1
base file for each node
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4,
4);
assertIds(readRecords(table), 3, 4, 5, 6);
@@ -179,7 +180,7 @@ public class TestMixedIcebergOptimizing extends
AbstractOptimizingTest {
updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "true");
updateProperties(table,
TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "1000");
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.FULL, 5,
4);
assertIds(
readRecords(table), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 21, 25, 29);
@@ -240,7 +241,7 @@ public class TestMixedIcebergOptimizing extends
AbstractOptimizingTest {
newRecord(9, "hhh", quickDateWithZone(4)),
newRecord(10, "iii", quickDateWithZone(4))));
// wait Major Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 4,
2);
assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10);
@@ -281,7 +282,7 @@ public class TestMixedIcebergOptimizing extends
AbstractOptimizingTest {
newRecord(9, "hhh", quickDateWithZone(4)),
newRecord(10, "iii", quickDateWithZone(4))));
// wait Major Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 2,
1);
assertIds(readRecords(table), 3, 4, 5, 6, 7, 8, 9, 10);
@@ -333,7 +334,7 @@ public class TestMixedIcebergOptimizing extends
AbstractOptimizingTest {
updateProperties(table, TableProperties.ENABLE_SELF_OPTIMIZING, "true");
// wait Optimize result
- OptimizingProcessMeta optimizeHistory = checker.waitOptimizeResult();
+ TableProcessMeta optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 1,
1);
optimizeHistory = checker.waitOptimizeResult();
checker.assertOptimizingProcess(optimizeHistory, OptimizingType.MINOR, 6,
1);
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
index 55cad9784..2bda80015 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MetricsSummary.java
@@ -74,6 +74,43 @@ public class MetricsSummary {
private int newDeleteFileCnt = 0;
private long newDeleteRecordCnt = 0;
+ public static MetricsSummary fromMap(Map<String, String> summary) {
+ MetricsSummary metricsSummary = new MetricsSummary();
+ metricsSummary.rewriteDataSize =
Long.parseLong(summary.getOrDefault(INPUT_DATA_SIZE, "0"));
+ metricsSummary.rewriteDataFileCnt =
+ Integer.parseInt(summary.getOrDefault(INPUT_DATA_FILES, "0"));
+ metricsSummary.rewriteDataRecordCnt =
+ Long.parseLong(summary.getOrDefault(INPUT_DATA_RECORDS, "0"));
+ metricsSummary.rewritePosDataSize =
+ Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_SIZE, "0"));
+ metricsSummary.rewritePosDataFileCnt =
+ Integer.parseInt(summary.getOrDefault(INPUT_POS_DELETE_FILES, "0"));
+ metricsSummary.rewritePosDataRecordCnt =
+ Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_RECORDS, "0"));
+ metricsSummary.equalityDeleteSize =
+ Long.parseLong(summary.getOrDefault(INPUT_EQ_DELETE_SIZE, "0"));
+ metricsSummary.eqDeleteFileCnt =
+ Integer.parseInt(summary.getOrDefault(INPUT_EQ_DELETE_FILES, "0"));
+ metricsSummary.eqDeleteRecordCnt =
+ Long.parseLong(summary.getOrDefault(INPUT_EQ_DELETE_RECORDS, "0"));
+ metricsSummary.positionDeleteSize =
+ Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_SIZE, "0"));
+ metricsSummary.posDeleteFileCnt =
+ Integer.parseInt(summary.getOrDefault(INPUT_POS_DELETE_FILES, "0"));
+ metricsSummary.posDeleteRecordCnt =
+ Long.parseLong(summary.getOrDefault(INPUT_POS_DELETE_RECORDS, "0"));
+ metricsSummary.newDataSize =
Long.parseLong(summary.getOrDefault(OUTPUT_DATA_SIZE, "0"));
+ metricsSummary.newDataFileCnt =
Integer.parseInt(summary.getOrDefault(OUTPUT_DATA_FILES, "0"));
+ metricsSummary.newDataRecordCnt =
+ Long.parseLong(summary.getOrDefault(OUTPUT_DATA_RECORDS, "0"));
+ metricsSummary.newDeleteSize =
Long.parseLong(summary.getOrDefault(OUTPUT_DELETE_SIZE, "0"));
+ metricsSummary.newDeleteFileCnt =
+ Integer.parseInt(summary.getOrDefault(OUTPUT_DELETE_FILES, "0"));
+ metricsSummary.newDeleteRecordCnt =
+ Long.parseLong(summary.getOrDefault(OUTPUT_DELETE_RECORDS, "0"));
+ return metricsSummary;
+ }
+
public MetricsSummary() {}
protected MetricsSummary(RewriteFilesInput input) {