This is an automated email from the ASF dual-hosted git repository.
baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 2b9a74dcc [Improvement]: Refactor code to replace DefaultTableRuntime
with TableRuntime interface (#3644)
2b9a74dcc is described below
commit 2b9a74dcc9d8f02847ca262977484fb12387162a
Author: baiyangtx <[email protected]>
AuthorDate: Tue Aug 5 15:31:46 2025 +0800
[Improvement]: Refactor code to replace DefaultTableRuntime with
TableRuntime interface (#3644)
* [Improvement]: Refactor make TableService return TableRuntime interface
* [Improvement]: Refactor make RuntimeHandlerChain depends on TableRuntime
interface
* c
* remove table runtime from table maintainer
* rename
* rename
---------
Co-authored-by: zhangyongxiang.alpha <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../amoro/server/DefaultOptimizingService.java | 31 ++++++++------
.../maintainer/IcebergTableMaintainer.java | 35 +++++++++-------
.../maintainer/MixedTableMaintainer.java | 49 ++++++++++++----------
.../optimizing/maintainer/TableMaintainer.java | 29 +++----------
...{TableMaintainer.java => TableMaintainers.java} | 48 ++++++---------------
.../scheduler/PeriodicExternalScheduler.java | 26 ++++++++----
.../server/scheduler/PeriodicTableScheduler.java | 29 ++++++-------
.../scheduler/inline/BlockerExpiringExecutor.java | 10 ++---
.../DanglingDeleteFilesCleaningExecutor.java | 18 ++++----
.../scheduler/inline/DataExpiringExecutor.java | 16 +++----
.../scheduler/inline/HiveCommitSyncExecutor.java | 8 ++--
.../scheduler/inline/OptimizingCommitExecutor.java | 25 ++++++-----
.../inline/OptimizingExpiringExecutor.java | 10 ++---
.../inline/OrphanFilesCleaningExecutor.java | 16 +++----
.../inline/SnapshotsExpiringExecutor.java | 16 +++----
.../inline/TableRuntimeRefreshExecutor.java | 30 ++++++++-----
.../scheduler/inline/TagsAutoCreatingExecutor.java | 16 +++----
.../amoro/server/table/DefaultTableRuntime.java | 1 +
.../amoro/server/table/DefaultTableService.java | 22 +++-------
.../amoro/server/table/RuntimeHandlerChain.java | 24 +++++------
.../apache/amoro/server/table/TableService.java | 3 +-
.../apache/amoro/server/AMSServiceTestBase.java | 5 +++
.../amoro/server/RestCatalogServiceTestBase.java | 6 +--
.../amoro/server/TestDefaultOptimizingService.java | 16 +++----
.../server/dashboard/TestOverviewManager.java | 4 +-
.../optimizing/maintainer/TestDataExpire.java | 16 +++----
.../optimizing/maintainer/TestOrphanFileClean.java | 16 +++----
.../maintainer/TestOrphanFileCleanHive.java | 2 +-
.../maintainer/TestOrphanFileCleanIceberg.java | 5 ++-
.../optimizing/maintainer/TestSnapshotExpire.java | 48 +++++++++++----------
.../maintainer/TestSnapshotExpireHive.java | 2 +-
.../amoro/server/table/AMSTableTestBase.java | 3 +-
.../table/TestDefaultTableRuntimeHandler.java | 37 +++++++---------
.../table/TestDefaultTableRuntimeManager.java | 2 +-
.../table/TestSyncTableOfExternalCatalog.java | 2 +-
.../amoro/server/table/TestTableManager.java | 3 +-
.../server/table/TestTableSummaryMetrics.java | 4 +-
.../main/java/org/apache/amoro/TableRuntime.java | 3 ++
38 files changed, 314 insertions(+), 322 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index c69e9194e..ffdfd72b8 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.OptimizerProperties;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.api.OptimizerRegisterInfo;
import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.api.OptimizingTask;
@@ -290,7 +291,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
return false;
}
long tableId = processMeta.getTableId();
- DefaultTableRuntime tableRuntime = tableService.getRuntime(tableId);
+ DefaultTableRuntime tableRuntime = (DefaultTableRuntime)
tableService.getRuntime(tableId);
if (tableRuntime == null) {
return false;
}
@@ -379,17 +380,17 @@ public class DefaultOptimizingService extends
StatedPersistentBase
private class TableRuntimeHandlerImpl extends RuntimeHandlerChain {
@Override
- public void handleStatusChanged(
- DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
- if
(!tableRuntime.getOptimizingState().getOptimizingStatus().isProcessing()) {
-
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
- .ifPresent(q -> q.refreshTable(tableRuntime));
+ public void handleStatusChanged(TableRuntime tableRuntime,
OptimizingStatus originalStatus) {
+ DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime)
tableRuntime;
+ if
(!defaultTableRuntime.getOptimizingState().getOptimizingStatus().isProcessing())
{
+
getOptionalQueueByGroup(defaultTableRuntime.getOptimizingState().getOptimizerGroup())
+ .ifPresent(q -> q.refreshTable(defaultTableRuntime));
}
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime runtime, TableConfiguration
originalConfig) {
+ DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
String originalGroup =
originalConfig.getOptimizingConfig().getOptimizerGroup();
if
(!tableRuntime.getOptimizingState().getOptimizerGroup().equals(originalGroup)) {
getOptionalQueueByGroup(originalGroup).ifPresent(q ->
q.releaseTable(tableRuntime));
@@ -399,21 +400,27 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
@Override
- public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime
tableRuntime) {
+ public void handleTableAdded(AmoroTable<?> table, TableRuntime runtime) {
+ DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
.ifPresent(q -> q.refreshTable(tableRuntime));
}
@Override
- public void handleTableRemoved(DefaultTableRuntime tableRuntime) {
+ public void handleTableRemoved(TableRuntime runtime) {
+ DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
.ifPresent(queue -> queue.releaseTable(tableRuntime));
}
@Override
- protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
+ protected void initHandler(List<TableRuntime> tableRuntimeList) {
LOG.info("OptimizerManagementService begin initializing");
- loadOptimizingQueues(tableRuntimeList);
+ loadOptimizingQueues(
+ tableRuntimeList.stream()
+ .filter(t -> t instanceof DefaultTableRuntime)
+ .map(t -> (DefaultTableRuntime) t)
+ .collect(Collectors.toList()));
optimizerKeeper.start();
optimizingConfigWatcher.start();
LOG.info("SuspendingDetector for Optimizer has been started.");
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index 42d5428b5..76e4a190c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -23,6 +23,7 @@ import static
org.apache.amoro.shade.guava32.com.google.common.primitives.Longs.
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.config.TagConfiguration;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.io.PathInfo;
@@ -37,6 +38,7 @@ import
org.apache.amoro.shade.guava32.com.google.common.base.Strings;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
@@ -119,13 +121,18 @@ public class IcebergTableMaintainer implements
TableMaintainer {
CommitMetaProducer.CLEAN_DANGLING_DELETE.name());
protected Table table;
+ private final TableIdentifier tableIdentifier;
+ private final DefaultTableRuntime tableRuntime;
- public IcebergTableMaintainer(Table table) {
+ public IcebergTableMaintainer(
+ Table table, TableIdentifier tableIdentifier, DefaultTableRuntime
tableRuntime) {
this.table = table;
+ this.tableIdentifier = tableIdentifier;
+ this.tableRuntime = tableRuntime;
}
@Override
- public void cleanOrphanFiles(DefaultTableRuntime tableRuntime) {
+ public void cleanOrphanFiles() {
TableConfiguration tableConfiguration =
tableRuntime.getTableConfiguration();
TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
tableRuntime.getOptimizingState().getOrphanFilesCleaningMetrics();
@@ -146,9 +153,8 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
@Override
- public void cleanDanglingDeleteFiles(DefaultTableRuntime tableRuntime) {
+ public void cleanDanglingDeleteFiles() {
TableConfiguration tableConfiguration =
tableRuntime.getTableConfiguration();
-
if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) {
return;
}
@@ -161,7 +167,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
Optional.ofNullable(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DELETE_FILES_PROP));
if (totalDeleteFiles.isPresent() && Long.parseLong(totalDeleteFiles.get())
> 0) {
// clear dangling delete files
- cleanDanglingDeleteFiles();
+ doCleanDanglingDeleteFiles();
} else {
LOG.debug(
"There are no delete files here, so there is no need to clean
dangling delete file for table {}",
@@ -170,7 +176,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
@Override
- public void expireSnapshots(DefaultTableRuntime tableRuntime) {
+ public void expireSnapshots() {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
@@ -247,10 +253,10 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
@Override
- public void expireData(DefaultTableRuntime tableRuntime) {
+ public void expireData() {
+ DataExpirationConfig expirationConfig =
+ tableRuntime.getTableConfiguration().getExpiringDataConfig();
try {
- DataExpirationConfig expirationConfig =
- tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field =
table.schema().findField(expirationConfig.getExpirationField());
if (!TableConfigurations.isValidDataExpirationField(expirationConfig,
field, table.name())) {
return;
@@ -258,7 +264,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig,
field));
} catch (Throwable t) {
- LOG.error("Unexpected purge error for table {} ",
tableRuntime.getTableIdentifier(), t);
+ LOG.error("Unexpected purge error for table {} ", tableIdentifier, t);
}
}
@@ -311,10 +317,9 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
@Override
- public void autoCreateTags(DefaultTableRuntime tableRuntime) {
- new AutoCreateIcebergTagAction(
- table, tableRuntime.getTableConfiguration().getTagConfiguration(),
LocalDateTime.now())
- .execute();
+ public void autoCreateTags() {
+ TagConfiguration tagConfiguration =
tableRuntime.getTableConfiguration().getTagConfiguration();
+ new AutoCreateIcebergTagAction(table, tagConfiguration,
LocalDateTime.now()).execute();
}
protected void cleanContentFiles(
@@ -339,7 +344,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
clearInternalTableMetadata(lastTime, orphanFilesCleaningMetrics);
}
- protected void cleanDanglingDeleteFiles() {
+ protected void doCleanDanglingDeleteFiles() {
LOG.info("Starting cleaning dangling delete files for table {}",
table.name());
int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles();
runWithCondition(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
index a6e998c8a..987192f89 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
@@ -81,31 +81,34 @@ public class MixedTableMaintainer implements
TableMaintainer {
private ChangeTableMaintainer changeMaintainer;
private final BaseTableMaintainer baseMaintainer;
+ private final DefaultTableRuntime tableRuntime;
- public MixedTableMaintainer(MixedTable mixedTable) {
+ public MixedTableMaintainer(MixedTable mixedTable, DefaultTableRuntime
tableRuntime) {
this.mixedTable = mixedTable;
+ this.tableRuntime = tableRuntime;
if (mixedTable.isKeyedTable()) {
- changeMaintainer = new
ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable());
- baseMaintainer = new
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable());
+ changeMaintainer =
+ new ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable(),
tableRuntime);
+ baseMaintainer = new
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable(), tableRuntime);
} else {
- baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable());
+ baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable(),
tableRuntime);
}
}
@Override
- public void cleanOrphanFiles(DefaultTableRuntime tableRuntime) {
+ public void cleanOrphanFiles() {
if (changeMaintainer != null) {
- changeMaintainer.cleanOrphanFiles(tableRuntime);
+ changeMaintainer.cleanOrphanFiles();
}
- baseMaintainer.cleanOrphanFiles(tableRuntime);
+ baseMaintainer.cleanOrphanFiles();
}
@Override
- public void expireSnapshots(DefaultTableRuntime tableRuntime) {
+ public void expireSnapshots() {
if (changeMaintainer != null) {
- changeMaintainer.expireSnapshots(tableRuntime);
+ changeMaintainer.expireSnapshots();
}
- baseMaintainer.expireSnapshots(tableRuntime);
+ baseMaintainer.expireSnapshots();
}
@VisibleForTesting
@@ -117,10 +120,10 @@ public class MixedTableMaintainer implements
TableMaintainer {
}
@Override
- public void expireData(DefaultTableRuntime tableRuntime) {
+ public void expireData() {
+ DataExpirationConfig expirationConfig =
+ tableRuntime.getTableConfiguration().getExpiringDataConfig();
try {
- DataExpirationConfig expirationConfig =
- tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field =
mixedTable.schema().findField(expirationConfig.getExpirationField());
if (!TableConfigurations.isValidDataExpirationField(
@@ -130,7 +133,7 @@ public class MixedTableMaintainer implements
TableMaintainer {
expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig,
field));
} catch (Throwable t) {
- LOG.error("Unexpected purge error for table {} ",
tableRuntime.getTableIdentifier(), t);
+ LOG.error("Unexpected purge error for table {} ", mixedTable.id(), t);
}
}
@@ -243,7 +246,7 @@ public class MixedTableMaintainer implements
TableMaintainer {
}
@Override
- public void autoCreateTags(DefaultTableRuntime tableRuntime) {
+ public void autoCreateTags() {
throw new UnsupportedOperationException("Mixed table doesn't support auto
create tags");
}
@@ -263,11 +266,11 @@ public class MixedTableMaintainer implements
TableMaintainer {
baseMaintainer.cleanMetadata(lastTime, orphanFilesCleaningMetrics);
}
- protected void cleanDanglingDeleteFiles() {
+ protected void doCleanDanglingDeleteFiles() {
if (changeMaintainer != null) {
- changeMaintainer.cleanDanglingDeleteFiles();
+ changeMaintainer.doCleanDanglingDeleteFiles();
}
- baseMaintainer.cleanDanglingDeleteFiles();
+ baseMaintainer.doCleanDanglingDeleteFiles();
}
public ChangeTableMaintainer getChangeMaintainer() {
@@ -284,8 +287,8 @@ public class MixedTableMaintainer implements
TableMaintainer {
private final UnkeyedTable unkeyedTable;
- public ChangeTableMaintainer(UnkeyedTable unkeyedTable) {
- super(unkeyedTable);
+ public ChangeTableMaintainer(UnkeyedTable unkeyedTable,
DefaultTableRuntime tableRuntime) {
+ super(unkeyedTable, mixedTable.id(), tableRuntime);
this.unkeyedTable = unkeyedTable;
}
@@ -297,7 +300,7 @@ public class MixedTableMaintainer implements
TableMaintainer {
}
@Override
- public void expireSnapshots(DefaultTableRuntime tableRuntime) {
+ public void expireSnapshots() {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
@@ -440,8 +443,8 @@ public class MixedTableMaintainer implements
TableMaintainer {
public class BaseTableMaintainer extends IcebergTableMaintainer {
private final Set<String> hiveFiles = Sets.newHashSet();
- public BaseTableMaintainer(UnkeyedTable unkeyedTable) {
- super(unkeyedTable);
+ public BaseTableMaintainer(UnkeyedTable unkeyedTable, DefaultTableRuntime
tableRuntime) {
+ super(unkeyedTable, mixedTable.id(), tableRuntime);
if (unkeyedTable.format() == TableFormat.MIXED_HIVE) {
hiveFiles.addAll(HiveLocationUtil.getHiveLocation(mixedTable));
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
index 4db6917a8..9474102b2 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
@@ -18,12 +18,6 @@
package org.apache.amoro.server.optimizing.maintainer;
-import org.apache.amoro.AmoroTable;
-import org.apache.amoro.TableFormat;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.table.MixedTable;
-import org.apache.iceberg.Table;
-
/**
* API for maintaining table.
*
@@ -34,10 +28,10 @@ import org.apache.iceberg.Table;
public interface TableMaintainer {
/** Clean table orphan files. Includes: data files, metadata files. */
- void cleanOrphanFiles(DefaultTableRuntime tableRuntime);
+ void cleanOrphanFiles();
/** Clean table dangling delete files. */
- default void cleanDanglingDeleteFiles(DefaultTableRuntime tableRuntime) {
+ default void cleanDanglingDeleteFiles() {
// DO nothing by default
}
@@ -45,27 +39,14 @@ public interface TableMaintainer {
* Expire snapshots. The optimizing based on the snapshot that the current
table relies on will
* not expire according to TableRuntime.
*/
- void expireSnapshots(DefaultTableRuntime tableRuntime);
+ void expireSnapshots();
/**
* Expire historical data based on the expiration field, and data that
exceeds the retention
* period will be purged
- *
- * @param tableRuntime TableRuntime
*/
- void expireData(DefaultTableRuntime tableRuntime);
+ void expireData();
/** Auto create tags for table. */
- void autoCreateTags(DefaultTableRuntime tableRuntime);
-
- static TableMaintainer ofTable(AmoroTable<?> amoroTable) {
- TableFormat format = amoroTable.format();
- if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
- return new MixedTableMaintainer((MixedTable) amoroTable.originalTable());
- } else if (TableFormat.ICEBERG.equals(format)) {
- return new IcebergTableMaintainer((Table) amoroTable.originalTable());
- } else {
- throw new RuntimeException("Unsupported table type" +
amoroTable.originalTable().getClass());
- }
- }
+ void autoCreateTags();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
similarity index 51%
copy from
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
copy to
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
index 4db6917a8..41f49bb23 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
@@ -20,50 +20,26 @@ package org.apache.amoro.server.optimizing.maintainer;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.Table;
-/**
- * API for maintaining table.
- *
- * <p>Includes: clean content files, clean metadata, clean dangling delete
files, expire snapshots,
- * auto create tags.
- */
-// TODO TableMaintainer should not be in this optimizing.xxx package.
-public interface TableMaintainer {
-
- /** Clean table orphan files. Includes: data files, metadata files. */
- void cleanOrphanFiles(DefaultTableRuntime tableRuntime);
-
- /** Clean table dangling delete files. */
- default void cleanDanglingDeleteFiles(DefaultTableRuntime tableRuntime) {
- // DO nothing by default
- }
-
- /**
- * Expire snapshots. The optimizing based on the snapshot that the current
table relies on will
- * not expire according to TableRuntime.
- */
- void expireSnapshots(DefaultTableRuntime tableRuntime);
-
- /**
- * Expire historical data based on the expiration field, and data that
exceeds the retention
- * period will be purged
- *
- * @param tableRuntime TableRuntime
- */
- void expireData(DefaultTableRuntime tableRuntime);
-
- /** Auto create tags for table. */
- void autoCreateTags(DefaultTableRuntime tableRuntime);
+/** Factory for creating {@link TableMaintainer}. */
+public class TableMaintainers {
- static TableMaintainer ofTable(AmoroTable<?> amoroTable) {
+ /** Create a {@link TableMaintainer} for the given table. */
+ public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime
tableRuntime) {
TableFormat format = amoroTable.format();
if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
- return new MixedTableMaintainer((MixedTable) amoroTable.originalTable());
+ Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+ return new MixedTableMaintainer(
+ (MixedTable) amoroTable.originalTable(), (DefaultTableRuntime)
tableRuntime);
} else if (TableFormat.ICEBERG.equals(format)) {
- return new IcebergTableMaintainer((Table) amoroTable.originalTable());
+ Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+ return new IcebergTableMaintainer(
+ (Table) amoroTable.originalTable(), amoroTable.id(),
(DefaultTableRuntime) tableRuntime);
} else {
throw new RuntimeException("Unsupported table type" +
amoroTable.originalTable().getClass());
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
index b9ad510d3..0b5141738 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java
@@ -19,6 +19,7 @@
package org.apache.amoro.server.scheduler;
import org.apache.amoro.Action;
+import org.apache.amoro.SupportsProcessPlugins;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.process.AmoroProcess;
import org.apache.amoro.process.ManagedProcess;
@@ -32,10 +33,11 @@ import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceManager;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.ProcessStateMapper;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import java.util.List;
+import java.util.Optional;
public abstract class PeriodicExternalScheduler extends PeriodicTableScheduler
{
@@ -56,21 +58,31 @@ public abstract class PeriodicExternalScheduler extends
PeriodicTableScheduler {
}
@Override
- protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
- tableRuntimeList.forEach(tableRuntime -> tableRuntime.install(getAction(),
processFactory));
+ protected void initHandler(List<TableRuntime> tableRuntimeList) {
+ tableRuntimeList.stream()
+ .filter(t -> t instanceof SupportsProcessPlugins)
+ .map(t -> (SupportsProcessPlugins) t)
+ .forEach(tableRuntime -> tableRuntime.install(getAction(),
processFactory));
super.initHandler(tableRuntimeList);
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
- return tableRuntime.enabled(getAction());
+ protected boolean enabled(TableRuntime tableRuntime) {
+ return Optional.of(tableRuntime)
+ .filter(t -> t instanceof SupportsProcessPlugins)
+ .map(t -> (SupportsProcessPlugins) t)
+ .map(t -> t.enabled(getAction()))
+ .orElse(false);
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
+ protected void execute(TableRuntime tableRuntime) {
+ Preconditions.checkArgument(tableRuntime instanceof
SupportsProcessPlugins);
+ SupportsProcessPlugins runtimeSupportProcessPlugin =
(SupportsProcessPlugins) tableRuntime;
// Trigger a table process and check conflicts by table runtime
// Update process state after process completed, the callback must be
register first
- AmoroProcess<? extends TableProcessState> process =
tableRuntime.trigger(getAction());
+ AmoroProcess<? extends TableProcessState> process =
+ runtimeSupportProcessPlugin.trigger(getAction());
process.getCompleteFuture().whenCompleted(() ->
persistTableProcess(process));
ManagedProcess<? extends TableProcessState> managedProcess = new
ManagedTableProcess<>(process);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
index c260cac53..f08b5110c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
@@ -22,9 +22,9 @@ import org.apache.amoro.Action;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.OptimizingStatus;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -78,7 +78,7 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
}
@Override
- protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
+ protected void initHandler(List<TableRuntime> tableRuntimeList) {
tableRuntimeList.stream()
.filter(this::enabled)
.forEach(
@@ -92,7 +92,7 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
logger.info("Table executor {} initialized", getClass().getSimpleName());
}
- private void executeTask(DefaultTableRuntime tableRuntime) {
+ private void executeTask(TableRuntime tableRuntime) {
try {
if (isExecutable(tableRuntime)) {
execute(tableRuntime);
@@ -103,8 +103,7 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
}
}
- protected final void scheduleIfNecessary(
- DefaultTableRuntime tableRuntime, long millisecondsTime) {
+ protected final void scheduleIfNecessary(TableRuntime tableRuntime, long
millisecondsTime) {
if (isExecutable(tableRuntime)) {
if (scheduledTables.add(tableRuntime.getTableIdentifier())) {
executor.schedule(() -> executeTask(tableRuntime), millisecondsTime,
TimeUnit.MILLISECONDS);
@@ -112,39 +111,37 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
}
}
- protected abstract long getNextExecutingTime(DefaultTableRuntime
tableRuntime);
+ protected abstract long getNextExecutingTime(TableRuntime tableRuntime);
- protected abstract boolean enabled(DefaultTableRuntime tableRuntime);
+ protected abstract boolean enabled(TableRuntime tableRuntime);
- protected abstract void execute(DefaultTableRuntime tableRuntime);
+ protected abstract void execute(TableRuntime tableRuntime);
protected String getThreadName() {
return String.join("-",
StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName()))
.toLowerCase(Locale.ROOT);
}
- private boolean isExecutable(DefaultTableRuntime tableRuntime) {
+ private boolean isExecutable(TableRuntime tableRuntime) {
return tableService.contains(tableRuntime.getTableIdentifier().getId())
&& enabled(tableRuntime);
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
// DO nothing by default
}
@Override
- public void handleTableRemoved(DefaultTableRuntime tableRuntime) {
+ public void handleTableRemoved(TableRuntime tableRuntime) {
// DO nothing, handling would be canceled when calling executeTable
}
@Override
- public void handleStatusChanged(
- DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {}
+ public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus
originalStatus) {}
@Override
- public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime
tableRuntime) {
+ public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime)
{
scheduleIfNecessary(tableRuntime, getStartDelay());
}
@@ -160,7 +157,7 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
return START_DELAY + getExecutorDelay();
}
- protected AmoroTable<?> loadTable(DefaultTableRuntime tableRuntime) {
+ protected AmoroTable<?> loadTable(TableRuntime tableRuntime) {
return tableService.loadTable(tableRuntime.getTableIdentifier());
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
index ec38e4d25..781081f6d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/BlockerExpiringExecutor.java
@@ -18,10 +18,10 @@
package org.apache.amoro.server.scheduler.inline;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
public class BlockerExpiringExecutor extends PeriodicTableScheduler {
@@ -35,12 +35,12 @@ public class BlockerExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
+ protected boolean enabled(TableRuntime tableRuntime) {
return true;
}
@@ -50,7 +50,7 @@ public class BlockerExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
+ protected void execute(TableRuntime tableRuntime) {
try {
persistency.doExpiring(tableRuntime);
} catch (Throwable t) {
@@ -60,7 +60,7 @@ public class BlockerExpiringExecutor extends
PeriodicTableScheduler {
private static class Persistency extends PersistentBase {
- public void doExpiring(DefaultTableRuntime tableRuntime) {
+ public void doExpiring(TableRuntime tableRuntime) {
String catalog = tableRuntime.getTableIdentifier().getCatalog();
String database = tableRuntime.getTableIdentifier().getDatabase();
String table = tableRuntime.getTableIdentifier().getTableName();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
index c19471535..16f80c9c0 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
@@ -19,8 +19,10 @@
package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
@@ -42,18 +44,18 @@ public class DanglingDeleteFilesCleaningExecutor extends
PeriodicTableScheduler
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
- return
tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
+ protected boolean enabled(TableRuntime tableRuntime) {
+ return tableRuntime instanceof DefaultTableRuntime
+ &&
tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
@@ -63,12 +65,12 @@ public class DanglingDeleteFilesCleaningExecutor extends
PeriodicTableScheduler
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
+ protected void execute(TableRuntime tableRuntime) {
try {
LOG.info("{} start cleaning dangling delete files",
tableRuntime.getTableIdentifier());
AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
- tableMaintainer.cleanDanglingDeleteFiles(tableRuntime);
+ TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
+ tableMaintainer.cleanDanglingDeleteFiles();
} catch (Throwable t) {
LOG.error("{} failed to clean dangling delete file",
tableRuntime.getTableIdentifier(), t);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
index 2f31b1660..4990b7409 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
@@ -19,10 +19,11 @@
package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,18 +43,17 @@ public class DataExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return interval.toMillis();
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
+ protected boolean enabled(TableRuntime tableRuntime) {
return
tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled();
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
@@ -63,11 +63,11 @@ public class DataExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
+ protected void execute(TableRuntime tableRuntime) {
try {
AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
- tableMaintainer.expireData(tableRuntime);
+ TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
+ tableMaintainer.expireData();
} catch (Throwable t) {
LOG.error("unexpected expire error of table {} ",
tableRuntime.getTableIdentifier(), t);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
index d5251905c..6bc46a382 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/HiveCommitSyncExecutor.java
@@ -19,11 +19,11 @@
package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.hive.utils.HiveMetaSynchronizer;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.table.MixedTable;
import org.slf4j.Logger;
@@ -42,12 +42,12 @@ public class HiveCommitSyncExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
+ protected boolean enabled(TableRuntime tableRuntime) {
return true;
}
@@ -57,7 +57,7 @@ public class HiveCommitSyncExecutor extends
PeriodicTableScheduler {
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
+ protected void execute(TableRuntime tableRuntime) {
long startTime = System.currentTimeMillis();
ServerTableIdentifier tableIdentifier = tableRuntime.getTableIdentifier();
try {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
index 67c2e7670..056314273 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingCommitExecutor.java
@@ -18,7 +18,7 @@
package org.apache.amoro.server.scheduler.inline;
-import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
@@ -35,13 +35,17 @@ public class OptimizingCommitExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
- return tableRuntime.getOptimizingState().getOptimizingStatus() ==
OptimizingStatus.COMMITTING;
+ protected boolean enabled(TableRuntime tableRuntime) {
+ return Optional.of(tableRuntime)
+ .filter(t -> t instanceof DefaultTableRuntime)
+ .map(t -> (DefaultTableRuntime) t)
+ .map(t -> t.getOptimizingState().getOptimizingStatus() ==
OptimizingStatus.COMMITTING)
+ .orElse(false);
}
@Override
@@ -50,8 +54,11 @@ public class OptimizingCommitExecutor extends
PeriodicTableScheduler {
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
-
Optional.ofNullable(tableRuntime.getOptimizingState().getOptimizingProcess())
+ protected void execute(TableRuntime tableRuntime) {
+ Optional.of(tableRuntime)
+ .filter(t -> t instanceof DefaultTableRuntime)
+ .map(t -> (DefaultTableRuntime) t)
+ .map(t -> t.getOptimizingState().getOptimizingProcess())
.orElseThrow(
() ->
new IllegalStateException(
@@ -60,14 +67,10 @@ public class OptimizingCommitExecutor extends
PeriodicTableScheduler {
}
@Override
- public void handleStatusChanged(
- DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
+ public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus
originalStatus) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
- @Override
- public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime
tableRuntime) {}
-
protected long getStartDelay() {
return 0;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
index ad8e71b83..77f610b7c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OptimizingExpiringExecutor.java
@@ -18,10 +18,10 @@
package org.apache.amoro.server.scheduler.inline;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.slf4j.Logger;
@@ -41,12 +41,12 @@ public class OptimizingExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return interval;
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
+ protected boolean enabled(TableRuntime tableRuntime) {
return true;
}
@@ -56,7 +56,7 @@ public class OptimizingExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
+ protected void execute(TableRuntime tableRuntime) {
try {
persistency.doExpiring(tableRuntime);
} catch (Throwable throwable) {
@@ -66,7 +66,7 @@ public class OptimizingExpiringExecutor extends
PeriodicTableScheduler {
}
private class Persistency extends PersistentBase {
- public void doExpiring(DefaultTableRuntime tableRuntime) {
+ public void doExpiring(TableRuntime tableRuntime) {
long expireTime = System.currentTimeMillis() - keepTime;
long minProcessId = SnowflakeIdGenerator.getMinSnowflakeId(expireTime);
doAsTransaction(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
index e7512e4b5..332c51141 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
@@ -19,10 +19,11 @@
package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,18 +41,17 @@ public class OrphanFilesCleaningExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return interval.toMillis();
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
+ protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().isCleanOrphanEnabled();
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
@@ -61,12 +61,12 @@ public class OrphanFilesCleaningExecutor extends
PeriodicTableScheduler {
}
@Override
- public void execute(DefaultTableRuntime tableRuntime) {
+ public void execute(TableRuntime tableRuntime) {
try {
LOG.info("{} start cleaning orphan files",
tableRuntime.getTableIdentifier());
AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
- tableMaintainer.cleanOrphanFiles(tableRuntime);
+ TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
+ tableMaintainer.cleanOrphanFiles();
} catch (Throwable t) {
LOG.error("{} failed to clean orphan file",
tableRuntime.getTableIdentifier(), t);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
index d465023bd..f7d0cb927 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
@@ -19,10 +19,11 @@
package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,18 +41,17 @@ public class SnapshotsExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
+ protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().isExpireSnapshotEnabled();
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
@@ -61,11 +61,11 @@ public class SnapshotsExpiringExecutor extends
PeriodicTableScheduler {
}
@Override
- public void execute(DefaultTableRuntime tableRuntime) {
+ public void execute(TableRuntime tableRuntime) {
try {
AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
- tableMaintainer.expireSnapshots(tableRuntime);
+ TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
+ tableMaintainer.expireSnapshots();
} catch (Throwable t) {
LOG.error("unexpected expire error of table {} ",
tableRuntime.getTableIdentifier(), t);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index d9557aba0..195f14213 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -19,6 +19,7 @@
package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
@@ -28,6 +29,7 @@ import org.apache.amoro.server.table.DefaultOptimizingState;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.IcebergTableUtil;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
/** Executor that refreshes table runtimes and evaluates optimizing status
periodically. */
@@ -45,13 +47,17 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
- return true;
+ protected boolean enabled(TableRuntime tableRuntime) {
+ return tableRuntime instanceof DefaultTableRuntime;
}
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ @Override
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
+ DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime)
tableRuntime;
return Math.min(
-
tableRuntime.getOptimizingState().getOptimizingConfig().getMinorLeastInterval()
* 4L / 5,
+
defaultTableRuntime.getOptimizingState().getOptimizingConfig().getMinorLeastInterval()
+ * 4L
+ / 5,
interval);
}
@@ -77,13 +83,14 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
+ Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+ DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime)
tableRuntime;
// After disabling self-optimizing, close the currently running optimizing
process.
if (originalConfig.getOptimizingConfig().isEnabled()
&&
!tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()) {
OptimizingProcess optimizingProcess =
- tableRuntime.getOptimizingState().getOptimizingProcess();
+ defaultTableRuntime.getOptimizingState().getOptimizingProcess();
if (optimizingProcess != null && optimizingProcess.getStatus() ==
ProcessStatus.RUNNING) {
optimizingProcess.close();
}
@@ -96,9 +103,12 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
}
@Override
- public void execute(DefaultTableRuntime tableRuntime) {
+ public void execute(TableRuntime tableRuntime) {
try {
- DefaultOptimizingState optimizingState =
tableRuntime.getOptimizingState();
+ Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+ DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime)
tableRuntime;
+
+ DefaultOptimizingState optimizingState =
defaultTableRuntime.getOptimizingState();
long lastOptimizedSnapshotId =
optimizingState.getLastOptimizedSnapshotId();
long lastOptimizedChangeSnapshotId =
optimizingState.getLastOptimizedChangeSnapshotId();
AmoroTable<?> table = loadTable(tableRuntime);
@@ -109,7 +119,7 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
|| lastOptimizedChangeSnapshotId !=
optimizingState.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId !=
optimizingState.getCurrentSnapshotId())) {
- tryEvaluatingPendingInput(tableRuntime, mixedTable);
+ tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
}
} catch (Throwable throwable) {
logger.error("Refreshing table {} failed.",
tableRuntime.getTableIdentifier(), throwable);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
index 99a06fb38..5207f105a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
@@ -20,10 +20,11 @@ package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,12 +43,12 @@ public class TagsAutoCreatingExecutor extends
PeriodicTableScheduler {
}
@Override
- protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
+ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return interval;
}
@Override
- protected boolean enabled(DefaultTableRuntime tableRuntime) {
+ protected boolean enabled(TableRuntime tableRuntime) {
return
tableRuntime.getTableConfiguration().getTagConfiguration().isAutoCreateTag()
&& tableRuntime.getFormat() == TableFormat.ICEBERG;
}
@@ -58,19 +59,18 @@ public class TagsAutoCreatingExecutor extends
PeriodicTableScheduler {
}
@Override
- protected void execute(DefaultTableRuntime tableRuntime) {
+ protected void execute(TableRuntime tableRuntime) {
try {
AmoroTable<?> amoroTable = loadTable(tableRuntime);
- TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable);
- tableMaintainer.autoCreateTags(tableRuntime);
+ TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
+ tableMaintainer.autoCreateTags();
} catch (Throwable t) {
LOG.error("Failed to create tags on {}",
tableRuntime.getTableIdentifier(), t);
}
}
@Override
- public void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ public void handleConfigChanged(TableRuntime tableRuntime,
TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 5ab6b0207..c06665df6 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -87,6 +87,7 @@ public class DefaultTableRuntime extends StatedPersistentBase
optimizingState.registerMetric(metricRegistry);
}
+ @Override
public void dispose() {
optimizingState.dispose();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 2c6d3d014..bb44b785a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -22,10 +22,10 @@ import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableIDWithFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.ExternalCatalog;
@@ -39,7 +39,6 @@ import
org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Objects;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -70,7 +69,7 @@ public class DefaultTableService extends PersistentBase
implements TableService
public static final Logger LOG =
LoggerFactory.getLogger(DefaultTableService.class);
private final long externalCatalogRefreshingInterval;
- private final Map<Long, DefaultTableRuntime> tableRuntimeMap = new
ConcurrentHashMap<>();
+ private final Map<Long, TableRuntime> tableRuntimeMap = new
ConcurrentHashMap<>();
private final ScheduledExecutorService tableExplorerScheduler =
Executors.newSingleThreadScheduledExecutor(
@@ -149,7 +148,7 @@ public class DefaultTableService extends PersistentBase
implements TableService
List<TableRuntimeMeta> tableRuntimeMetaList =
getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas);
- List<DefaultTableRuntime> tableRuntimes = new
ArrayList<>(tableRuntimeMetaList.size());
+ List<TableRuntime> tableRuntimes = new
ArrayList<>(tableRuntimeMetaList.size());
tableRuntimeMetaList.forEach(
tableRuntimeMeta -> {
DefaultTableRuntime tableRuntime = new
DefaultTableRuntime(tableRuntimeMeta, this);
@@ -184,17 +183,8 @@ public class DefaultTableService extends PersistentBase
implements TableService
initialized.complete(true);
}
- private DefaultTableRuntime getAndCheckExist(ServerTableIdentifier
tableIdentifier) {
- Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier
cannot be null");
- DefaultTableRuntime tableRuntime = getRuntime(tableIdentifier.getId());
- if (tableRuntime == null) {
- throw new ObjectNotExistsException(tableIdentifier);
- }
- return tableRuntime;
- }
-
@Override
- public DefaultTableRuntime getRuntime(Long tableId) {
+ public TableRuntime getRuntime(Long tableId) {
checkStarted();
return tableRuntimeMap.get(tableId);
}
@@ -261,7 +251,7 @@ public class DefaultTableService extends PersistentBase
implements TableService
catalogManager.listCatalogMetas().stream()
.map(CatalogMeta::getCatalogName)
.collect(Collectors.toSet());
- for (DefaultTableRuntime tableRuntime : tableRuntimeMap.values()) {
+ for (TableRuntime tableRuntime : tableRuntimeMap.values()) {
if
(!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) {
disposeTable(tableRuntime.getTableIdentifier());
}
@@ -476,7 +466,7 @@ public class DefaultTableService extends PersistentBase
implements TableService
@VisibleForTesting
public void disposeTable(ServerTableIdentifier tableIdentifier) {
- DefaultTableRuntime existedTableRuntime =
tableRuntimeMap.get(tableIdentifier.getId());
+ TableRuntime existedTableRuntime =
tableRuntimeMap.get(tableIdentifier.getId());
try {
doAsTransaction(
() ->
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
index 0ab56c4a2..148d03d76 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server.table;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
@@ -51,8 +52,8 @@ public abstract class RuntimeHandlerChain {
}
}
- public final void initialize(List<DefaultTableRuntime> tableRuntimes) {
- List<DefaultTableRuntime> supportedtableRuntimeList =
+ public final void initialize(List<TableRuntime> tableRuntimes) {
+ List<TableRuntime> supportedtableRuntimeList =
tableRuntimes.stream()
.filter(runtime -> formatSupported(runtime.getFormat()))
.collect(Collectors.toList());
@@ -63,8 +64,7 @@ public abstract class RuntimeHandlerChain {
}
}
- public final void fireStatusChanged(
- DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
+ public final void fireStatusChanged(TableRuntime tableRuntime,
OptimizingStatus originalStatus) {
if (!initialized) {
return;
}
@@ -77,7 +77,7 @@ public abstract class RuntimeHandlerChain {
}
public final void fireConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ TableRuntime tableRuntime, TableConfiguration originalConfig) {
if (!initialized) {
return;
}
@@ -90,7 +90,7 @@ public abstract class RuntimeHandlerChain {
}
}
- public final void fireTableAdded(AmoroTable<?> table, DefaultTableRuntime
tableRuntime) {
+ public final void fireTableAdded(AmoroTable<?> table, TableRuntime
tableRuntime) {
if (!initialized) {
return;
}
@@ -103,7 +103,7 @@ public abstract class RuntimeHandlerChain {
}
}
- public final void fireTableRemoved(DefaultTableRuntime tableRuntime) {
+ public final void fireTableRemoved(TableRuntime tableRuntime) {
if (!initialized) {
return;
}
@@ -138,16 +138,16 @@ public abstract class RuntimeHandlerChain {
}
protected abstract void handleStatusChanged(
- DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus);
+ TableRuntime tableRuntime, OptimizingStatus originalStatus);
protected abstract void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig);
+ TableRuntime tableRuntime, TableConfiguration originalConfig);
- protected abstract void handleTableAdded(AmoroTable<?> table,
DefaultTableRuntime tableRuntime);
+ protected abstract void handleTableAdded(AmoroTable<?> table, TableRuntime
tableRuntime);
- protected abstract void handleTableRemoved(DefaultTableRuntime tableRuntime);
+ protected abstract void handleTableRemoved(TableRuntime tableRuntime);
- protected abstract void initHandler(List<DefaultTableRuntime>
tableRuntimeList);
+ protected abstract void initHandler(List<TableRuntime> tableRuntimeList);
protected abstract void doDispose();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
index 68c1ff8f7..ae711639f 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server.table;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.catalog.InternalCatalog;
public interface TableService extends TableRuntimeHandler {
@@ -32,7 +33,7 @@ public interface TableService extends TableRuntimeHandler {
void onTableDropped(InternalCatalog catalog, ServerTableIdentifier
identifier);
- DefaultTableRuntime getRuntime(Long tableId);
+ TableRuntime getRuntime(Long tableId);
default boolean contains(Long tableId) {
return getRuntime(tableId) != null;
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 175772cb2..15f7009b0 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -22,6 +22,7 @@ import org.apache.amoro.config.Configurations;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.manager.MetricManager;
+import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.DefaultTableService;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -68,6 +69,10 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
return TABLE_SERVICE;
}
+ protected DefaultTableRuntime getDefaultTableRuntime(long tableId) {
+ return (DefaultTableRuntime) tableService().getRuntime(tableId);
+ }
+
protected DefaultOptimizingService optimizingService() {
return OPTIMIZING_SERVICE;
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
index 5e324a47c..5a3c4aa82 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
@@ -21,11 +21,11 @@ package org.apache.amoro.server;
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -118,13 +118,13 @@ public abstract class RestCatalogServiceTestBase {
return internalCatalog.loadTableMetadata(identifier.getDatabase(),
identifier.getTableName());
}
- protected DefaultTableRuntime getTableRuntime(TableIdentifier identifier) {
+ protected TableRuntime getTableRuntime(TableIdentifier identifier) {
ServerTableIdentifier serverTableIdentifier =
getServerTableIdentifier(identifier);
return tableService.getRuntime(serverTableIdentifier.getId());
}
protected void assertTableRuntime(TableIdentifier identifier, TableFormat
format) {
- DefaultTableRuntime runtime = getTableRuntime(identifier);
+ TableRuntime runtime = getTableRuntime(identifier);
Assertions.assertNotNull(runtime, "table runtime is not exists after
created");
Assertions.assertEquals(format, runtime.getFormat());
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 9c6051d44..2e1d0b8f4 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -128,7 +128,7 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
(MixedTable)
tableService().loadTable(serverTableIdentifier()).originalTable();
appendData(mixedTable.asUnkeyedTable(), 1);
appendData(mixedTable.asUnkeyedTable(), 2);
- DefaultTableRuntime runtime =
tableService().getRuntime(serverTableIdentifier().getId());
+ DefaultTableRuntime runtime =
getDefaultTableRuntime(serverTableIdentifier().getId());
runtime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
}
@@ -384,14 +384,12 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
reload();
// Committing process will be closed when reloading
Assertions.assertNull(
- tableService()
- .getRuntime(serverTableIdentifier().getId())
+ getDefaultTableRuntime(serverTableIdentifier().getId())
.getOptimizingState()
.getOptimizingProcess());
Assertions.assertEquals(
OptimizingStatus.IDLE,
- tableService()
- .getRuntime(serverTableIdentifier().getId())
+ getDefaultTableRuntime(serverTableIdentifier().getId())
.getOptimizingState()
.getOptimizingStatus());
}
@@ -748,15 +746,13 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
}
Assertions.assertEquals(
ProcessStatus.RUNNING,
- tableService()
- .getRuntime(serverTableIdentifier().getId())
+ getDefaultTableRuntime(serverTableIdentifier().getId())
.getOptimizingState()
.getOptimizingProcess()
.getStatus());
Assertions.assertEquals(
OptimizingStatus.COMMITTING,
- tableService()
- .getRuntime(serverTableIdentifier().getId())
+ getDefaultTableRuntime(serverTableIdentifier().getId())
.getOptimizingState()
.getOptimizingStatus());
}
@@ -783,7 +779,7 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
}
void refreshPending() {
- execute(tableService().getRuntime(serverTableIdentifier().getId()));
+ execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
index 1cf06a5d6..2bb77983a 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java
@@ -91,7 +91,7 @@ public class TestOverviewManager extends AMSTableTestBase {
.asUnkeyedTable();
appendData(table, 1);
appendData(table, 2);
- DefaultTableRuntime runtime =
tableService().getRuntime(serverTableIdentifier().getId());
+ DefaultTableRuntime runtime =
getDefaultTableRuntime(serverTableIdentifier().getId());
runtime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
}
@@ -109,7 +109,7 @@ public class TestOverviewManager extends AMSTableTestBase {
void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE,
Integer.MAX_VALUE);
-
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
+ refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index ebec3c8f9..9e10d94c3 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -242,7 +242,7 @@ public class TestDataExpire extends ExecutorTestBase {
// expire partitions that order than 2022-01-02 18:00:00.000
DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(keyedTable);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(keyedTable, null);
tableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -325,7 +325,7 @@ public class TestDataExpire extends ExecutorTestBase {
// expire partitions that order than 2022-01-02 18:00:00.000
DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
mixedTableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -394,14 +394,15 @@ public class TestDataExpire extends ExecutorTestBase {
if (getTestFormat().equals(TableFormat.ICEBERG)) {
Table table = getMixedTable().asUnkeyedTable();
- IcebergTableMaintainer icebergTableMaintainer = new
IcebergTableMaintainer(table);
+ IcebergTableMaintainer icebergTableMaintainer =
+ new IcebergTableMaintainer(table, getMixedTable().id(), null);
Types.NestedField field =
table.schema().findField(config.getExpirationField());
long lastSnapshotTime = table.currentSnapshot().timestampMillis();
long lastCommitTime = icebergTableMaintainer.expireBaseOnRule(config,
field).toEpochMilli();
Assert.assertEquals(lastSnapshotTime, lastCommitTime);
} else {
MixedTable mixedTable = getMixedTable();
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(mixedTable);
+ MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(mixedTable, null);
Types.NestedField field =
getMixedTable().schema().findField(config.getExpirationField());
long lastSnapshotTime;
@@ -427,7 +428,8 @@ public class TestDataExpire extends ExecutorTestBase {
protected void getMaintainerAndExpire(DataExpirationConfig config, String
datetime) {
if (getTestFormat().equals(TableFormat.ICEBERG)) {
Table table = getMixedTable().asUnkeyedTable();
- IcebergTableMaintainer icebergTableMaintainer = new
IcebergTableMaintainer(table);
+ IcebergTableMaintainer icebergTableMaintainer =
+ new IcebergTableMaintainer(table, getMixedTable().id(), null);
Types.NestedField field =
table.schema().findField(config.getExpirationField());
icebergTableMaintainer.expireDataFrom(
config,
@@ -439,7 +441,7 @@ public class TestDataExpire extends ExecutorTestBase {
getMixedTable().schema().findField(config.getExpirationField())))
.toInstant());
} else {
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
Types.NestedField field =
getMixedTable().schema().findField(config.getExpirationField());
mixedTableMaintainer.expireDataFrom(
config,
@@ -501,7 +503,7 @@ public class TestDataExpire extends ExecutorTestBase {
assertScanResult(scan, 1, 0);
DataExpirationConfig config = parseDataExpirationConfig(testTable);
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
mixedTableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2024-01-01T00:00:00.000")
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
index 5df5b47b1..df472d4fd 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
@@ -120,7 +120,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
tableIdentifier.getDatabase(),
tableIdentifier.getTableName(),
getTestFormat()));
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), null);
maintainer.cleanContentFiles(
System.currentTimeMillis()
- TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 *
1000,
@@ -204,7 +204,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
}
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), null);
TableIdentifier tableIdentifier = getMixedTable().id();
TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(
@@ -297,7 +297,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
}
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
TableIdentifier tableIdentifier = getMixedTable().id();
TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(
@@ -346,9 +346,9 @@ public class TestOrphanFileClean extends ExecutorTestBase {
Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
Assert.assertTrue(unkeyedTable.io().exists(file3.path()));
- new MixedTableMaintainer(getMixedTable())
+ new MixedTableMaintainer(getMixedTable(), null)
.cleanContentFiles(System.currentTimeMillis() + 1,
orphanFilesCleaningMetrics);
- new MixedTableMaintainer(getMixedTable())
+ new MixedTableMaintainer(getMixedTable(), null)
.cleanMetadata(System.currentTimeMillis() + 1,
orphanFilesCleaningMetrics);
Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
@@ -411,8 +411,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
new TableOrphanFilesCleaningMetrics(
ServerTableIdentifier.of(baseTable.id(), getTestFormat())));
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable());
- maintainer.cleanOrphanFiles(tableRuntime);
+ MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), tableRuntime);
+ maintainer.cleanOrphanFiles();
Assert.assertTrue(getMixedTable().io().exists(baseOrphanFileDir));
Assert.assertTrue(getMixedTable().io().exists(baseOrphanFilePath));
@@ -420,7 +420,7 @@ public class TestOrphanFileClean extends ExecutorTestBase {
baseTable.updateProperties().set("gc.enabled", "true").commit();
Mockito.when(tableRuntime.getTableConfiguration())
.thenReturn(TableConfigurations.parseTableConfig((baseTable.properties())));
- maintainer.cleanOrphanFiles(tableRuntime);
+ maintainer.cleanOrphanFiles();
Assert.assertFalse(getMixedTable().io().exists(baseOrphanFileDir));
Assert.assertFalse(getMixedTable().io().exists(baseOrphanFilePath));
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
index 8a818a4d5..e55f33ee8 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
@@ -84,7 +84,7 @@ public class TestOrphanFileCleanHive extends
TestOrphanFileClean {
changeOrphanDataFile.createOrOverwrite().close();
Assert.assertTrue(getMixedTable().io().exists(hiveOrphanFilePath));
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), null);
TableIdentifier tableIdentifier = getMixedTable().id();
TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
index baca500dc..60bbd4853 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
@@ -100,8 +100,9 @@ public class TestOrphanFileCleanIceberg extends
TestOrphanFileClean {
.commit();
assertDanglingDeleteFiles(testTable, 1);
- IcebergTableMaintainer tableMaintainer = new
IcebergTableMaintainer(testTable);
- tableMaintainer.cleanDanglingDeleteFiles();
+ IcebergTableMaintainer tableMaintainer =
+ new IcebergTableMaintainer(testTable, testTable.id(), null);
+ tableMaintainer.doCleanDanglingDeleteFiles();
assertDanglingDeleteFiles(testTable, 0);
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
index d5326dfab..4d1c0a6d7 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
@@ -116,7 +116,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
file ->
Assert.assertTrue(testKeyedTable.changeTable().io().exists(file.path().toString())));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, null);
tableMaintainer.getChangeMaintainer().expireFiles(l + 1);
// In order to advance the snapshot
@@ -180,8 +180,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertEquals(5,
Iterables.size(testKeyedTable.changeTable().snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable);
- tableMaintainer.expireSnapshots(tableRuntime);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, tableRuntime);
+ tableMaintainer.expireSnapshots();
Assert.assertEquals(2,
Iterables.size(testKeyedTable.changeTable().snapshots()));
List<Snapshot> expectedSnapshots = new ArrayList<>();
@@ -225,8 +225,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertEquals(4, Iterables.size(table.snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable());
- tableMaintainer.expireSnapshots(tableRuntime);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable(), tableRuntime);
+ tableMaintainer.expireSnapshots();
Assert.assertEquals(2, Iterables.size(table.snapshots()));
List<Snapshot> expectedSnapshots = new ArrayList<>();
@@ -268,8 +268,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertEquals(4, Iterables.size(table.snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable());
- tableMaintainer.expireSnapshots(tableRuntime);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable(), tableRuntime);
+ tableMaintainer.expireSnapshots();
Assert.assertEquals(2, Iterables.size(table.snapshots()));
List<Snapshot> expectedSnapshots = new ArrayList<>();
@@ -299,7 +299,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Mockito.when(tableRuntime.getTableConfiguration())
.thenReturn(TableConfigurations.parseTableConfig(table.properties()));
- new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+ new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
Assert.assertEquals(1, Iterables.size(table.snapshots()));
table.newAppend().commit();
@@ -320,7 +320,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
table.newAppend().commit();
expectedSnapshots.add(table.currentSnapshot());
- new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+ new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
Assert.assertEquals(3, Iterables.size(table.snapshots()));
Assert.assertTrue(
Iterators.elementsEqual(expectedSnapshots.iterator(),
table.snapshots().iterator()));
@@ -344,7 +344,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
List<DataFile> newDataFiles = writeAndCommitBaseStore(table);
Assert.assertEquals(3, Iterables.size(table.snapshots()));
- new
MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis(), 1);
+ new MixedTableMaintainer(table,
null).expireSnapshots(System.currentTimeMillis(), 1);
Assert.assertEquals(1, Iterables.size(table.snapshots()));
dataFiles.forEach(file ->
Assert.assertFalse(table.io().exists(file.path().toString())));
@@ -392,7 +392,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertEquals(12,
Iterables.size(testKeyedTable.changeTable().newScan().planFiles()));
Assert.assertEquals(3,
Iterables.size(testKeyedTable.changeTable().snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, null);
tableMaintainer.getChangeMaintainer().expireFiles(secondCommitTime + 1);
tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime +
1, 1);
@@ -460,7 +460,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertTrue(baseTable.io().exists(file1.path()));
Assert.assertTrue(baseTable.io().exists(file2.path()));
Assert.assertTrue(baseTable.io().exists(file3.path()));
- new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime, 1);
+ new MixedTableMaintainer(testKeyedTable, null).expireSnapshots(expireTime,
1);
Assert.assertEquals(1, Iterables.size(baseTable.snapshots()));
Assert.assertFalse(baseTable.io().exists(file1.path()));
@@ -491,15 +491,16 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Mockito.when(optimizingState.getTableConfiguration())
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, tableRuntime);
testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL,
"0").commit();
- tableMaintainer.expireSnapshots(tableRuntime);
+ tableMaintainer.expireSnapshots();
Assert.assertEquals(2,
Iterables.size(testKeyedTable.changeTable().snapshots()));
testKeyedTable.updateProperties().set("gc.enabled", "true").commit();
Mockito.when(tableRuntime.getTableConfiguration())
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
- tableMaintainer.expireSnapshots(tableRuntime);
+ tableMaintainer = new MixedTableMaintainer(testKeyedTable, tableRuntime);
+ tableMaintainer.expireSnapshots();
Assert.assertEquals(1,
Iterables.size(testKeyedTable.changeTable().snapshots()));
}
@@ -521,21 +522,22 @@ public class TestSnapshotExpire extends ExecutorTestBase {
.thenReturn(ServerTableIdentifier.of(testUnkeyedTable.id(),
getTestFormat()));
Mockito.when(tableRuntime.getOptimizingState().getOptimizingStatus())
.thenReturn(OptimizingStatus.IDLE);
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
-
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testUnkeyedTable);
testUnkeyedTable
.updateProperties()
.set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0")
.commit();
- tableMaintainer.expireSnapshots(tableRuntime);
+ Mockito.when(tableRuntime.getTableConfiguration())
+
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
+
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testUnkeyedTable, tableRuntime);
+ tableMaintainer.expireSnapshots();
Assert.assertEquals(2, Iterables.size(testUnkeyedTable.snapshots()));
testUnkeyedTable.updateProperties().set("gc.enabled", "true").commit();
Mockito.when(tableRuntime.getTableConfiguration())
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
- tableMaintainer.expireSnapshots(tableRuntime);
+ tableMaintainer = new MixedTableMaintainer(testUnkeyedTable, tableRuntime);
+ tableMaintainer.expireSnapshots();
Assert.assertEquals(1, Iterables.size(testUnkeyedTable.snapshots()));
}
@@ -563,7 +565,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Mockito.when(tableRuntime.getOptimizingState().getOptimizingStatus())
.thenReturn(OptimizingStatus.IDLE);
- new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+ new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
Assert.assertEquals(2, Iterables.size(table.snapshots()));
table.newAppend().commit();
@@ -571,7 +573,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
table.newAppend().commit();
expectedSnapshots.add(table.currentSnapshot());
- new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
+ new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
Assert.assertEquals(3, Iterables.size(table.snapshots()));
Assert.assertTrue(
Iterators.elementsEqual(expectedSnapshots.iterator(),
table.snapshots().iterator()));
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
index dd9f198c8..ef1609d5b 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
@@ -114,7 +114,7 @@ public class TestSnapshotExpireHive extends
TestSnapshotExpire {
isKeyedTable()
? getMixedTable().asKeyedTable().baseTable()
: getMixedTable().asUnkeyedTable();
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable());
+ MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis(),
1);
Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots()));
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
index 67c666bd7..db8162494 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
@@ -21,6 +21,7 @@ package org.apache.amoro.server.table;
import org.apache.amoro.CommonUnifiedCatalog;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.UnifiedCatalog;
import org.apache.amoro.api.CatalogMeta;
@@ -300,7 +301,7 @@ public class AMSTableTestBase extends AMSServiceTestBase {
tableTestHelper().primaryKeySpec().primaryKeyExisted(),
mixedTable.isKeyedTable());
}
- protected void validateTableRuntime(DefaultTableRuntime tableRuntime) {
+ protected void validateTableRuntime(TableRuntime tableRuntime) {
Assert.assertEquals(serverTableIdentifier(),
tableRuntime.getTableIdentifier());
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
index 0a0dd445b..85d55baaf 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java
@@ -22,6 +22,7 @@ import org.apache.amoro.AmoroTable;
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
@@ -99,8 +100,7 @@ public class TestDefaultTableRuntimeHandler extends
AMSTableTestBase {
MixedTable mixedTable = (MixedTable)
tableService().loadTable(createTableId).originalTable();
mixedTable.updateProperties().set(TableProperties.ENABLE_ORPHAN_CLEAN,
"true").commit();
- tableService()
- .getRuntime(createTableId.getId())
+ getDefaultTableRuntime(createTableId.getId())
.getOptimizingState()
.refresh(tableService.loadTable(serverTableIdentifier()));
Assert.assertEquals(1, handler.getConfigChangedTables().size());
@@ -133,39 +133,38 @@ public class TestDefaultTableRuntimeHandler extends
AMSTableTestBase {
static class TestHandler extends RuntimeHandlerChain {
- private final List<DefaultTableRuntime> initTables = Lists.newArrayList();
- private final List<Pair<DefaultTableRuntime, OptimizingStatus>>
statusChangedTables =
+ private final List<TableRuntime> initTables = Lists.newArrayList();
+ private final List<Pair<TableRuntime, OptimizingStatus>>
statusChangedTables =
Lists.newArrayList();
- private final List<Pair<DefaultTableRuntime, TableConfiguration>>
configChangedTables =
+ private final List<Pair<TableRuntime, TableConfiguration>>
configChangedTables =
Lists.newArrayList();
- private final List<Pair<MixedTable, DefaultTableRuntime>> addedTables =
Lists.newArrayList();
- private final List<DefaultTableRuntime> removedTables =
Lists.newArrayList();
+ private final List<Pair<MixedTable, TableRuntime>> addedTables =
Lists.newArrayList();
+ private final List<TableRuntime> removedTables = Lists.newArrayList();
private boolean disposed = false;
@Override
- protected void handleStatusChanged(
- DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
+ protected void handleStatusChanged(TableRuntime tableRuntime,
OptimizingStatus originalStatus) {
statusChangedTables.add(Pair.of(tableRuntime, originalStatus));
}
@Override
protected void handleConfigChanged(
- DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
+ TableRuntime tableRuntime, TableConfiguration originalConfig) {
configChangedTables.add(Pair.of(tableRuntime, originalConfig));
}
@Override
- protected void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime
tableRuntime) {
+ protected void handleTableAdded(AmoroTable<?> table, TableRuntime
tableRuntime) {
addedTables.add(Pair.of((MixedTable) table.originalTable(),
tableRuntime));
}
@Override
- protected void handleTableRemoved(DefaultTableRuntime tableRuntime) {
+ protected void handleTableRemoved(TableRuntime tableRuntime) {
removedTables.add(tableRuntime);
}
@Override
- protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
+ protected void initHandler(List<TableRuntime> tableRuntimeList) {
initTables.addAll(tableRuntimeList);
}
@@ -174,23 +173,19 @@ public class TestDefaultTableRuntimeHandler extends
AMSTableTestBase {
disposed = true;
}
- public List<DefaultTableRuntime> getInitTables() {
+ public List<TableRuntime> getInitTables() {
return initTables;
}
- public List<Pair<DefaultTableRuntime, OptimizingStatus>>
getStatusChangedTables() {
- return statusChangedTables;
- }
-
- public List<Pair<DefaultTableRuntime, TableConfiguration>>
getConfigChangedTables() {
+ public List<Pair<TableRuntime, TableConfiguration>>
getConfigChangedTables() {
return configChangedTables;
}
- public List<Pair<MixedTable, DefaultTableRuntime>> getAddedTables() {
+ public List<Pair<MixedTable, TableRuntime>> getAddedTables() {
return addedTables;
}
- public List<DefaultTableRuntime> getRemovedTables() {
+ public List<TableRuntime> getRemovedTables() {
return removedTables;
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
index 51e8829c7..17f0e6349 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeManager.java
@@ -71,7 +71,7 @@ public class TestDefaultTableRuntimeManager extends
AMSTableTestBase {
@Test
public void testTableRuntime() {
- DefaultTableRuntime tableRuntime =
tableService().getRuntime(serverTableIdentifier().getId());
+ DefaultTableRuntime tableRuntime =
getDefaultTableRuntime(serverTableIdentifier().getId());
validateTableRuntime(tableRuntime);
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
index c5dc72f2e..7985ce844 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
@@ -513,7 +513,7 @@ public class TestSyncTableOfExternalCatalog extends
AMSTableTestBase {
Assert.assertEquals(1,
tableRuntimeMetaListForOptimizerGroupAfterAddTable.size());
Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
- DefaultTableRuntime tableRuntime =
tableService().getRuntime(tableIdentifier.getId());
+ DefaultTableRuntime tableRuntime =
getDefaultTableRuntime(tableIdentifier.getId());
// create a tableIdentifier that throw exceptions when calling the
getTableName() method, which
// can lead to exceptions when deleting tableIdentifier in disposeTable().
try (AutoCloseable ignored = MockitoAnnotations.openMocks(this)) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
index 7332abd39..e792db0d7 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
@@ -346,8 +346,7 @@ public class TestTableManager extends AMSTableTestBase {
}
private boolean isTableRuntimeBlocked(BlockableOperation operation) {
- return tableService()
- .getRuntime(serverTableIdentifier().getId())
+ return getDefaultTableRuntime(serverTableIdentifier().getId())
.getOptimizingState()
.isBlocked(operation);
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
index 61454fff3..588f7f83c 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java
@@ -105,7 +105,7 @@ public class TestTableSummaryMetrics extends
AMSTableTestBase {
.asUnkeyedTable();
appendData(table);
appendPosDelete(table);
- DefaultTableRuntime runtime =
tableService().getRuntime(serverTableIdentifier().getId());
+ DefaultTableRuntime runtime =
getDefaultTableRuntime(serverTableIdentifier().getId());
runtime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
}
@@ -143,7 +143,7 @@ public class TestTableSummaryMetrics extends
AMSTableTestBase {
void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE,
Integer.MAX_VALUE);
-
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
+ refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index 6509843ef..9ee7cf184 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -65,4 +65,7 @@ public interface TableRuntime {
default TableFormat getFormat() {
return getTableIdentifier().getFormat();
}
+
+ /** Dispose the table runtime. */
+ default void dispose() {}
}