This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 23fb957da [AMORO-4022] [Improvement]: AMS Iceberg maintainer moved to
the amoro-iceberg module (#4024)
23fb957da is described below
commit 23fb957da40e3a88b955907adf15bd653fc1148f
Author: ConradJam <[email protected]>
AuthorDate: Wed Jan 7 17:12:17 2026 +0800
[AMORO-4022] [Improvement]: AMS Iceberg maintainer moved to the
amoro-iceberg module (#4024)
[AMORO-4022] AMS Iceberg maintainer moved to the amoro-iceberg module
(#4022)
---
.../maintainer/DefaultTableMaintainerContext.java | 109 +++++++++++
...aintainers.java => TableMaintainerFactory.java} | 42 ++++-
.../optimizing/maintainer/TableMaintainers.java | 30 ++-
.../DanglingDeleteFilesCleaningExecutor.java | 2 +-
.../scheduler/inline/DataExpiringExecutor.java | 2 +-
.../inline/OrphanFilesCleaningExecutor.java | 2 +-
.../inline/SnapshotsExpiringExecutor.java | 2 +-
.../scheduler/inline/TagsAutoCreatingExecutor.java | 2 +-
.../amoro/server/table/TableOptimizingMetrics.java | 6 +-
.../table/TableOrphanFilesCleaningMetrics.java | 14 +-
.../optimizing/maintainer/TestDataExpire.java | 30 ++-
.../optimizing/maintainer/TestOrphanFileClean.java | 41 ++---
.../maintainer/TestOrphanFileCleanHive.java | 6 +-
.../maintainer/TestOrphanFileCleanIceberg.java | 4 +-
.../optimizing/maintainer/TestSnapshotExpire.java | 129 +++++--------
.../maintainer/TestSnapshotExpireHive.java | 5 +-
.../maintainer/TestTableMaintainerContext.java | 98 ++++++++++
.../apache/amoro/maintainer/MaintainerMetrics.java | 52 ++++++
.../apache/amoro/maintainer/OptimizingInfo.java | 54 ++++++
.../apache/amoro}/maintainer/TableMaintainer.java | 3 +-
.../amoro/maintainer/TableMaintainerContext.java | 63 +++++++
.../maintainer/AutoCreateIcebergTagAction.java | 2 +-
.../maintainer/IcebergTableMaintainer.java | 205 ++++++++++++---------
.../iceberg}/maintainer/MixedTableMaintainer.java | 152 +++++++++------
.../formats/iceberg/utils/IcebergTableUtil.java | 196 ++++++++++++++++++++
.../formats/iceberg/utils/RollingFileCleaner.java | 139 ++++++++++++++
.../maintainer/TestAutoCreateIcebergTagAction.java | 60 +++++-
27 files changed, 1167 insertions(+), 283 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java
new file mode 100644
index 000000000..ca47e2252
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.maintainer;
+
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.OptimizingInfo;
+import org.apache.amoro.maintainer.TableMaintainerContext;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
+import org.apache.amoro.server.utils.HiveLocationUtil;
+import org.apache.amoro.table.MixedTable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Default implementation of TableMaintainerContext for AMS. Adapts
DefaultTableRuntime to
+ * TableMaintainerContext interface.
+ */
+public class DefaultTableMaintainerContext implements TableMaintainerContext {
+
+ private final DefaultTableRuntime tableRuntime;
+ private final MixedTable mixedTable;
+
+ public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime) {
+ this.tableRuntime = tableRuntime;
+ this.mixedTable = null;
+ }
+
+ public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime,
MixedTable mixedTable) {
+ this.tableRuntime = tableRuntime;
+ this.mixedTable = mixedTable;
+ }
+
+ @Override
+ public TableConfiguration getTableConfiguration() {
+ return tableRuntime.getTableConfiguration();
+ }
+
+ @Override
+ public MaintainerMetrics getMetrics() {
+ TableOrphanFilesCleaningMetrics metrics =
tableRuntime.getOrphanFilesCleaningMetrics();
+ return new MaintainerMetrics() {
+ @Override
+ public void recordOrphanDataFilesCleaned(int expected, int cleaned) {
+ metrics.completeOrphanDataFiles(expected, cleaned);
+ }
+
+ @Override
+ public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {
+ metrics.completeOrphanMetadataFiles(expected, cleaned);
+ }
+ };
+ }
+
+ @Override
+ public OptimizingInfo getOptimizingInfo() {
+ return new DefaultOptimizingInfo(tableRuntime);
+ }
+
+ @Override
+ public Set<String> getHiveLocationPaths() {
+ // Use HiveLocationUtil to get Hive location paths
+ if (mixedTable == null) {
+ return Collections.emptySet();
+ }
+ return HiveLocationUtil.getHiveLocation(mixedTable);
+ }
+
+ /** OptimizingInfo implementation based on DefaultTableRuntime. */
+ private static class DefaultOptimizingInfo implements OptimizingInfo {
+
+ private final DefaultTableRuntime tableRuntime;
+
+ DefaultOptimizingInfo(DefaultTableRuntime tableRuntime) {
+ this.tableRuntime = tableRuntime;
+ }
+
+ @Override
+ public boolean isProcessing() {
+ return tableRuntime.getOptimizingStatus().isProcessing();
+ }
+
+ @Override
+ public long getTargetSnapshotId() {
+ if (!isProcessing()) {
+ return Long.MAX_VALUE;
+ }
+ return tableRuntime.getOptimizingProcess().getTargetSnapshotId();
+ }
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java
similarity index 53%
copy from
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
copy to
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java
index 41f49bb23..ec1583fa1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainerFactory.java
@@ -21,25 +21,53 @@ package org.apache.amoro.server.optimizing.maintainer;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.Table;
-/** Factory for creating {@link TableMaintainer}. */
-public class TableMaintainers {
+/** Factory for creating {@link TableMaintainer} instances. */
+public class TableMaintainerFactory {
- /** Create a {@link TableMaintainer} for the given table. */
+ /**
+ * Create an Iceberg table maintainer with AMS context.
+ *
+ * @param table the Iceberg table
+ * @param tableRuntime the AMS table runtime
+ * @return IcebergTableMaintainer instance
+ */
+ public static IcebergTableMaintainer createIcebergMaintainer(
+ Table table, DefaultTableRuntime tableRuntime) {
+ return new IcebergTableMaintainer(
+ table,
+ tableRuntime.getTableIdentifier().getIdentifier(),
+ new DefaultTableMaintainerContext(tableRuntime));
+ }
+
+ /**
+ * Create a {@link TableMaintainer} for the given table.
+ *
+ * @param amoroTable the Amoro table
+ * @param tableRuntime the table runtime
+ * @return TableMaintainer instance
+ */
public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime
tableRuntime) {
+ Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+ DefaultTableRuntime runtime = (DefaultTableRuntime) tableRuntime;
TableFormat format = amoroTable.format();
+
if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
- Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+ MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
return new MixedTableMaintainer(
- (MixedTable) amoroTable.originalTable(), (DefaultTableRuntime)
tableRuntime);
+ mixedTable, new DefaultTableMaintainerContext(runtime, mixedTable));
} else if (TableFormat.ICEBERG.equals(format)) {
- Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
return new IcebergTableMaintainer(
- (Table) amoroTable.originalTable(), amoroTable.id(),
(DefaultTableRuntime) tableRuntime);
+ (Table) amoroTable.originalTable(),
+ amoroTable.id(),
+ new DefaultTableMaintainerContext(runtime));
} else {
throw new RuntimeException("Unsupported table type" +
amoroTable.originalTable().getClass());
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
index 41f49bb23..eb152bb09 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainers.java
@@ -21,25 +21,41 @@ package org.apache.amoro.server.optimizing.maintainer;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.Table;
/** Factory for creating {@link TableMaintainer}. */
+@Deprecated
public class TableMaintainers {
- /** Create a {@link TableMaintainer} for the given table. */
+ /**
+ * Create a {@link TableMaintainer} for the given table.
+ *
+ * @deprecated since 0.9.0, will be removed in 0.10.0. Use {@link
+ * TableMaintainerFactory#create(AmoroTable, TableRuntime)} instead.
+ */
public static TableMaintainer create(AmoroTable<?> amoroTable, TableRuntime
tableRuntime) {
+ return TableMaintainerFactory.create(amoroTable, tableRuntime);
+ }
+
+ /**
+ * Create a {@link TableMaintainer} for the given table with
DefaultTableRuntime.
+ *
+ * @deprecated since 0.9.0, will be removed in 0.10.0. Use {@link
+ * TableMaintainerFactory#createIcebergMaintainer(Table,
DefaultTableRuntime)} instead.
+ */
+ public static TableMaintainer create(AmoroTable<?> amoroTable,
DefaultTableRuntime tableRuntime) {
TableFormat format = amoroTable.format();
if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
- Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
+ MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
return new MixedTableMaintainer(
- (MixedTable) amoroTable.originalTable(), (DefaultTableRuntime)
tableRuntime);
+ mixedTable, new DefaultTableMaintainerContext(tableRuntime,
mixedTable));
} else if (TableFormat.ICEBERG.equals(format)) {
- Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
- return new IcebergTableMaintainer(
- (Table) amoroTable.originalTable(), amoroTable.id(),
(DefaultTableRuntime) tableRuntime);
+ return TableMaintainerFactory.createIcebergMaintainer(
+ (Table) amoroTable.originalTable(), tableRuntime);
} else {
throw new RuntimeException("Unsupported table type" +
amoroTable.originalTable().getClass());
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
index d7d8801f1..337481439 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
index 61f45860b..05d72761f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
index 21d60cd10..19db0f192 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
index 15f2d49d9..d137b98a7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java
@@ -21,7 +21,7 @@ package org.apache.amoro.server.scheduler.inline;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
index 5207f105a..b70015e8e 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TagsAutoCreatingExecutor.java
@@ -22,7 +22,7 @@ import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
-import org.apache.amoro.server.optimizing.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
index ef93356d5..a6b2a65df 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
@@ -31,7 +31,6 @@ import org.apache.amoro.metrics.MetricRegistry;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.optimizing.OptimizingStatus;
-import org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
import org.apache.amoro.shade.guava32.com.google.common.primitives.Longs;
import org.apache.iceberg.Snapshot;
@@ -350,7 +349,10 @@ public class TableOptimizingMetrics extends
AbstractTableMetrics {
}
// ignore snapshot which is created by amoro maintain commits or no files
added
if (snapshot.summary().values().stream()
- .anyMatch(IcebergTableMaintainer.AMORO_MAINTAIN_COMMITS::contains)
+ .anyMatch(
+
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer
+ .AMORO_MAINTAIN_COMMITS
+ ::contains)
||
Long.parseLong(snapshot.summary().getOrDefault(SnapshotSummary.ADDED_FILES_PROP,
"0"))
== 0) {
return;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
index 9692b01e7..481eb175b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableOrphanFilesCleaningMetrics.java
@@ -21,12 +21,14 @@ package org.apache.amoro.server.table;
import static org.apache.amoro.metrics.MetricDefine.defineCounter;
import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.maintainer.MaintainerMetrics;
import org.apache.amoro.metrics.Counter;
import org.apache.amoro.metrics.MetricDefine;
import org.apache.amoro.metrics.MetricRegistry;
/** Table Orphan Files Cleaning metrics. */
-public class TableOrphanFilesCleaningMetrics extends AbstractTableMetrics {
+public class TableOrphanFilesCleaningMetrics extends AbstractTableMetrics
+ implements MaintainerMetrics {
private final Counter orphanDataFilesCount = new Counter();
private final Counter expectedOrphanDataFilesCount = new Counter();
@@ -89,4 +91,14 @@ public class TableOrphanFilesCleaningMetrics extends
AbstractTableMetrics {
expectedOrphanMetadataFilesCount.inc(expected);
orphanMetadataFilesCount.inc(cleaned);
}
+
+ @Override
+ public void recordOrphanDataFilesCleaned(int expected, int cleaned) {
+ completeOrphanDataFiles(expected, cleaned);
+ }
+
+ @Override
+ public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {
+ completeOrphanMetadataFiles(expected, cleaned);
+ }
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index 75b3e9a60..737bd3878 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -31,6 +31,8 @@ import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.data.ChangeAction;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.PrimaryKeyedFile;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.scan.KeyedTableFileScanHelper;
import org.apache.amoro.optimizing.scan.TableFileScanHelper;
@@ -291,7 +293,8 @@ public class TestDataExpire extends ExecutorTestBase {
// expire partitions that order than 2022-01-02 18:00:00.000
DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(keyedTable, null);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(keyedTable,
TestTableMaintainerContext.of(keyedTable));
tableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -380,7 +383,8 @@ public class TestDataExpire extends ExecutorTestBase {
// expire partitions that order than 2022-01-02 18:00:00.000
DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ MixedTableMaintainer mixedTableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
mixedTableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2022-01-03T18:00:00.000")
@@ -454,14 +458,18 @@ public class TestDataExpire extends ExecutorTestBase {
if (getTestFormat().equals(TableFormat.ICEBERG)) {
Table table = getMixedTable().asUnkeyedTable();
IcebergTableMaintainer icebergTableMaintainer =
- new IcebergTableMaintainer(table, getMixedTable().id(), null);
+ new IcebergTableMaintainer(
+ table,
+ getMixedTable().id(),
+ TestTableMaintainerContext.of(getMixedTable().asUnkeyedTable()));
Types.NestedField field =
table.schema().findField(config.getExpirationField());
long lastSnapshotTime = table.currentSnapshot().timestampMillis();
long lastCommitTime = icebergTableMaintainer.expireBaseOnRule(config,
field).toEpochMilli();
Assert.assertEquals(lastSnapshotTime, lastCommitTime);
} else {
MixedTable mixedTable = getMixedTable();
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(mixedTable, null);
+ MixedTableMaintainer mixedTableMaintainer =
+ new MixedTableMaintainer(mixedTable,
TestTableMaintainerContext.of(mixedTable));
Types.NestedField field =
getMixedTable().schema().findField(config.getExpirationField());
long lastSnapshotTime;
@@ -488,7 +496,10 @@ public class TestDataExpire extends ExecutorTestBase {
if (getTestFormat().equals(TableFormat.ICEBERG)) {
Table table = getMixedTable().asUnkeyedTable();
IcebergTableMaintainer icebergTableMaintainer =
- new IcebergTableMaintainer(table, getMixedTable().id(), null);
+ new IcebergTableMaintainer(
+ table,
+ getMixedTable().id(),
+ TestTableMaintainerContext.of(getMixedTable().asUnkeyedTable()));
Types.NestedField field =
table.schema().findField(config.getExpirationField());
icebergTableMaintainer.expireDataFrom(
config,
@@ -500,7 +511,8 @@ public class TestDataExpire extends ExecutorTestBase {
getMixedTable().schema().findField(config.getExpirationField())))
.toInstant());
} else {
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ MixedTableMaintainer mixedTableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
Types.NestedField field =
getMixedTable().schema().findField(config.getExpirationField());
mixedTableMaintainer.expireDataFrom(
config,
@@ -562,7 +574,8 @@ public class TestDataExpire extends ExecutorTestBase {
assertScanResult(scan, 1, 0);
DataExpirationConfig config = parseDataExpirationConfig(testTable);
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ MixedTableMaintainer mixedTableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
mixedTableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2024-01-01T00:00:00.000")
@@ -829,7 +842,8 @@ public class TestDataExpire extends ExecutorTestBase {
.writeChangeStore(keyedTable, 1L, ChangeAction.INSERT,
changeRecords, false));
DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(keyedTable, null);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(keyedTable,
TestTableMaintainerContext.of(keyedTable));
tableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2022-01-04T12:00:00.000")
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
index da78da32e..1326bacdd 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileClean.java
@@ -18,8 +18,8 @@
package org.apache.amoro.server.optimizing.maintainer;
-import static
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
-import static
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.FLINK_JOB_ID;
+import static
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
+import static
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.FLINK_JOB_ID;
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.ServerTableIdentifier;
@@ -27,9 +27,8 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
import org.apache.amoro.server.scheduler.inline.ExecutorTestBase;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableConfigurations;
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.TableProperties;
@@ -47,7 +46,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@@ -119,7 +117,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
tableIdentifier.getDatabase(),
tableIdentifier.getTableName(),
getTestFormat()));
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ MixedTableMaintainer maintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
maintainer.cleanContentFiles(
System.currentTimeMillis()
- TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT * 60 *
1000,
@@ -203,7 +202,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
}
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ MixedTableMaintainer maintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
TableIdentifier tableIdentifier = getMixedTable().id();
TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(
@@ -296,7 +296,8 @@ public class TestOrphanFileClean extends ExecutorTestBase {
Assert.assertTrue(getMixedTable().io().exists(changeInvalidMetadataJson));
}
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
TableIdentifier tableIdentifier = getMixedTable().id();
TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(
@@ -345,9 +346,9 @@ public class TestOrphanFileClean extends ExecutorTestBase {
Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
Assert.assertTrue(unkeyedTable.io().exists(file3.path()));
- new MixedTableMaintainer(getMixedTable(), null)
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()))
.cleanContentFiles(System.currentTimeMillis() + 1,
orphanFilesCleaningMetrics);
- new MixedTableMaintainer(getMixedTable(), null)
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()))
.cleanMetadata(System.currentTimeMillis() + 1,
orphanFilesCleaningMetrics);
Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
@@ -395,28 +396,16 @@ public class TestOrphanFileClean extends ExecutorTestBase
{
Assert.assertTrue(getMixedTable().io().exists(changeOrphanFilePath));
}
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(baseTable.id(), getTestFormat()));
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(baseTable.properties()));
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(baseTable.properties()));
-
- Mockito.when(tableRuntime.getOrphanFilesCleaningMetrics())
- .thenReturn(
- new TableOrphanFilesCleaningMetrics(
- ServerTableIdentifier.of(baseTable.id(), getTestFormat())));
-
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), tableRuntime);
+ MixedTableMaintainer maintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
maintainer.cleanOrphanFiles();
Assert.assertTrue(getMixedTable().io().exists(baseOrphanFileDir));
Assert.assertTrue(getMixedTable().io().exists(baseOrphanFilePath));
baseTable.updateProperties().set("gc.enabled", "true").commit();
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig((baseTable.properties())));
+ maintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
maintainer.cleanOrphanFiles();
Assert.assertFalse(getMixedTable().io().exists(baseOrphanFileDir));
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
index e55f33ee8..729999525 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanHive.java
@@ -18,12 +18,13 @@
package org.apache.amoro.server.optimizing.maintainer;
-import static
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
+import static
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.DATA_FOLDER_NAME;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
import org.apache.amoro.hive.TestHMS;
import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
import org.apache.amoro.hive.catalog.HiveTableTestHelper;
@@ -84,7 +85,8 @@ public class TestOrphanFileCleanHive extends
TestOrphanFileClean {
changeOrphanDataFile.createOrOverwrite().close();
Assert.assertTrue(getMixedTable().io().exists(hiveOrphanFilePath));
- MixedTableMaintainer maintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ MixedTableMaintainer maintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()));
TableIdentifier tableIdentifier = getMixedTable().id();
TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
index 60bbd4853..c882187af 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestOrphanFileCleanIceberg.java
@@ -23,6 +23,7 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
import org.apache.amoro.hive.io.writer.AdaptHiveGenericTaskWriterBuilder;
import org.apache.amoro.io.writer.SortedPosDeleteWriter;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
@@ -101,7 +102,8 @@ public class TestOrphanFileCleanIceberg extends
TestOrphanFileClean {
assertDanglingDeleteFiles(testTable, 1);
IcebergTableMaintainer tableMaintainer =
- new IcebergTableMaintainer(testTable, testTable.id(), null);
+ new IcebergTableMaintainer(
+ testTable, testTable.id(),
TestTableMaintainerContext.of(testTable));
tableMaintainer.doCleanDanglingDeleteFiles();
assertDanglingDeleteFiles(testTable, 0);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
index 9f04c4ee2..1a391a404 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
@@ -18,20 +18,18 @@
package org.apache.amoro.server.optimizing.maintainer;
-import static
org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.FLINK_MAX_COMMITTED_CHECKPOINT_ID;
+import static
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.FLINK_MAX_COMMITTED_CHECKPOINT_ID;
import static
org.apache.amoro.utils.MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;
import org.apache.amoro.BasicTableTestHelper;
-import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.data.ChangeAction;
-import org.apache.amoro.server.optimizing.OptimizingProcess;
-import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
+import org.apache.amoro.maintainer.OptimizingInfo;
import org.apache.amoro.server.scheduler.inline.ExecutorTestBase;
-import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableConfigurations;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterators;
@@ -55,7 +53,6 @@ import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashSet;
@@ -115,7 +112,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
file ->
Assert.assertTrue(testKeyedTable.changeTable().io().exists(file.path().toString())));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, null);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(testKeyedTable,
TestTableMaintainerContext.of(testKeyedTable));
tableMaintainer.getChangeMaintainer().expireFiles(l + 1);
// In order to advance the snapshot
@@ -166,18 +164,11 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Snapshot lastSnapshot = testKeyedTable.changeTable().currentSnapshot();
testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL,
"0").commit();
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(testKeyedTable.id(),
getTestFormat()));
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig((testKeyedTable.properties())));
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig((testKeyedTable.properties())));
Assert.assertEquals(5,
Iterables.size(testKeyedTable.changeTable().snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, tableRuntime);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(testKeyedTable,
TestTableMaintainerContext.of(testKeyedTable));
tableMaintainer.expireSnapshots();
Assert.assertEquals(2,
Iterables.size(testKeyedTable.changeTable().snapshots()));
@@ -210,16 +201,11 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Snapshot lastSnapshot = table.currentSnapshot();
table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION,
"0").commit();
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
- Mockito.when(tableRuntime.getTableConfiguration())
- .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
Assert.assertEquals(4, Iterables.size(table.snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable(), tableRuntime);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(table));
tableMaintainer.expireSnapshots();
Assert.assertEquals(2, Iterables.size(table.snapshots()));
@@ -249,18 +235,11 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Snapshot lastSnapshot = table.currentSnapshot();
table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION,
"0").commit();
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
- Mockito.when(tableRuntime.getTableConfiguration())
- .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
- Mockito.when(tableRuntime.getTableConfiguration())
- .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
Assert.assertEquals(4, Iterables.size(table.snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(getMixedTable(), tableRuntime);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(table));
tableMaintainer.expireSnapshots();
Assert.assertEquals(2, Iterables.size(table.snapshots()));
@@ -281,24 +260,29 @@ public class TestSnapshotExpire extends ExecutorTestBase {
table.newAppend().commit();
table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION,
"0").commit();
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
- Mockito.when(tableRuntime.getTableConfiguration())
- .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
-
- new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+ TestTableMaintainerContext.Impl context =
+ new TestTableMaintainerContext.Impl(
+ TableConfigurations.parseTableConfig(table.properties()), table);
+ new MixedTableMaintainer(table, context).expireSnapshots();
Assert.assertEquals(1, Iterables.size(table.snapshots()));
table.newAppend().commit();
- // mock tableRuntime which has optimizing task not committed
+ // mock optimizing task not committed
long optimizeSnapshotId = table.currentSnapshot().snapshotId();
- OptimizingProcess optimizingProcess =
Mockito.mock(OptimizingProcess.class);
-
Mockito.when(optimizingProcess.getTargetSnapshotId()).thenReturn(optimizeSnapshotId);
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.COMMITTING);
-
Mockito.when(tableRuntime.getOptimizingProcess()).thenReturn(optimizingProcess);
+ OptimizingInfo optimizingInfo =
+ new OptimizingInfo() {
+ @Override
+ public boolean isProcessing() {
+ return true;
+ }
+
+ @Override
+ public long getTargetSnapshotId() {
+ return optimizeSnapshotId;
+ }
+ };
+ context.setOptimizingInfo(optimizingInfo);
List<Snapshot> expectedSnapshots = new ArrayList<>();
expectedSnapshots.add(table.currentSnapshot());
@@ -307,7 +291,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
table.newAppend().commit();
expectedSnapshots.add(table.currentSnapshot());
- new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+ new MixedTableMaintainer(table, context).expireSnapshots();
Assert.assertEquals(3, Iterables.size(table.snapshots()));
Assert.assertTrue(
Iterators.elementsEqual(expectedSnapshots.iterator(),
table.snapshots().iterator()));
@@ -331,7 +315,9 @@ public class TestSnapshotExpire extends ExecutorTestBase {
List<DataFile> newDataFiles = writeAndCommitBaseStore(table);
Assert.assertEquals(3, Iterables.size(table.snapshots()));
- new MixedTableMaintainer(table,
null).expireSnapshots(System.currentTimeMillis(), 1);
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(getMixedTable()))
+ .getBaseMaintainer()
+ .expireSnapshots(System.currentTimeMillis(), 1);
Assert.assertEquals(1, Iterables.size(table.snapshots()));
dataFiles.forEach(file ->
Assert.assertFalse(table.io().exists(file.path().toString())));
@@ -379,7 +365,8 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertEquals(12,
Iterables.size(testKeyedTable.changeTable().newScan().planFiles()));
Assert.assertEquals(3,
Iterables.size(testKeyedTable.changeTable().snapshots()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, null);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(testKeyedTable,
TestTableMaintainerContext.of(testKeyedTable));
tableMaintainer.getChangeMaintainer().expireFiles(secondCommitTime + 1);
tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime +
1, 1);
@@ -447,7 +434,9 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertTrue(baseTable.io().exists(file1.path()));
Assert.assertTrue(baseTable.io().exists(file2.path()));
Assert.assertTrue(baseTable.io().exists(file3.path()));
- new MixedTableMaintainer(testKeyedTable, null).expireSnapshots(expireTime,
1);
+ new MixedTableMaintainer(testKeyedTable,
TestTableMaintainerContext.of(testKeyedTable))
+ .getBaseMaintainer()
+ .expireSnapshots(expireTime, 1);
Assert.assertEquals(1, Iterables.size(baseTable.snapshots()));
Assert.assertFalse(baseTable.io().exists(file1.path()));
@@ -467,24 +456,15 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertEquals(2,
Iterables.size(testKeyedTable.changeTable().snapshots()));
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(testKeyedTable.id(),
getTestFormat()));
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
-
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testKeyedTable, tableRuntime);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(testKeyedTable,
TestTableMaintainerContext.of(testKeyedTable));
testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL,
"0").commit();
tableMaintainer.expireSnapshots();
Assert.assertEquals(2,
Iterables.size(testKeyedTable.changeTable().snapshots()));
testKeyedTable.updateProperties().set("gc.enabled", "true").commit();
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(testKeyedTable.properties()));
- tableMaintainer = new MixedTableMaintainer(testKeyedTable, tableRuntime);
+ tableMaintainer =
+ new MixedTableMaintainer(testKeyedTable,
TestTableMaintainerContext.of(testKeyedTable));
tableMaintainer.expireSnapshots();
Assert.assertEquals(1,
Iterables.size(testKeyedTable.changeTable().snapshots()));
}
@@ -500,22 +480,16 @@ public class TestSnapshotExpire extends ExecutorTestBase {
Assert.assertEquals(2, Iterables.size(testUnkeyedTable.snapshots()));
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(testUnkeyedTable.id(),
getTestFormat()));
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
+ MixedTableMaintainer tableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(testUnkeyedTable));
testUnkeyedTable.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION,
"0").commit();
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
- MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(testUnkeyedTable, tableRuntime);
tableMaintainer.expireSnapshots();
Assert.assertEquals(2, Iterables.size(testUnkeyedTable.snapshots()));
testUnkeyedTable.updateProperties().set("gc.enabled", "true").commit();
- Mockito.when(tableRuntime.getTableConfiguration())
-
.thenReturn(TableConfigurations.parseTableConfig(testUnkeyedTable.properties()));
- tableMaintainer = new MixedTableMaintainer(testUnkeyedTable, tableRuntime);
+ tableMaintainer =
+ new MixedTableMaintainer(getMixedTable(),
TestTableMaintainerContext.of(testUnkeyedTable));
tableMaintainer.expireSnapshots();
Assert.assertEquals(1, Iterables.size(testUnkeyedTable.snapshots()));
}
@@ -534,14 +508,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION,
"0s").commit();
table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT,
"3").commit();
- DefaultTableRuntime tableRuntime = Mockito.mock(DefaultTableRuntime.class);
- Mockito.when(tableRuntime.getTableIdentifier())
- .thenReturn(ServerTableIdentifier.of(table.id(), getTestFormat()));
- Mockito.when(tableRuntime.getTableConfiguration())
- .thenReturn(TableConfigurations.parseTableConfig(table.properties()));
-
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
-
- new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+ new MixedTableMaintainer(table,
TestTableMaintainerContext.of(table)).expireSnapshots();
Assert.assertEquals(2, Iterables.size(table.snapshots()));
table.newAppend().commit();
@@ -549,7 +516,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
table.newAppend().commit();
expectedSnapshots.add(table.currentSnapshot());
- new MixedTableMaintainer(table, tableRuntime).expireSnapshots();
+ new MixedTableMaintainer(table,
TestTableMaintainerContext.of(table)).expireSnapshots();
Assert.assertEquals(3, Iterables.size(table.snapshots()));
Assert.assertTrue(
Iterators.elementsEqual(expectedSnapshots.iterator(),
table.snapshots().iterator()));
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
index 16539fa01..b1628808c 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java
@@ -21,11 +21,13 @@ package org.apache.amoro.server.optimizing.maintainer;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.formats.iceberg.maintainer.MixedTableMaintainer;
import org.apache.amoro.hive.TestHMS;
import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
import org.apache.amoro.hive.catalog.HiveTableTestHelper;
import org.apache.amoro.hive.io.HiveDataTestHelpers;
import org.apache.amoro.hive.utils.HiveTableUtil;
+import org.apache.amoro.maintainer.TableMaintainerContext;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.MixedTable;
@@ -111,7 +113,8 @@ public class TestSnapshotExpireHive extends
TestSnapshotExpire {
isKeyedTable()
? getMixedTable().asKeyedTable().baseTable()
: getMixedTable().asUnkeyedTable();
- MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), null);
+ TableMaintainerContext context =
TestTableMaintainerContext.of(getMixedTable());
+ MixedTableMaintainer mixedTableMaintainer = new
MixedTableMaintainer(getMixedTable(), context);
mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis(),
1);
Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots()));
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestTableMaintainerContext.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestTableMaintainerContext.java
new file mode 100644
index 000000000..ea2ab96d2
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestTableMaintainerContext.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.maintainer;
+
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.OptimizingInfo;
+import org.apache.amoro.maintainer.TableMaintainerContext;
+import org.apache.amoro.server.table.TableConfigurations;
+import org.apache.amoro.server.utils.HiveLocationUtil;
+import org.apache.amoro.table.KeyedTable;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.UnkeyedTable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Utility class for creating test TableMaintainerContext instances. */
+public class TestTableMaintainerContext {
+
+ /** Create a test TableMaintainerContext for the given MixedTable. */
+ public static TableMaintainerContext of(MixedTable table) {
+ return new Impl(TableConfigurations.parseTableConfig(table.properties()),
table);
+ }
+
+ /** Create a test TableMaintainerContext for the given KeyedTable. */
+ public static TableMaintainerContext of(KeyedTable table) {
+ return new Impl(TableConfigurations.parseTableConfig(table.properties()));
+ }
+
+ /** Create a test TableMaintainerContext for the given UnkeyedTable. */
+ public static TableMaintainerContext of(UnkeyedTable table) {
+ return new Impl(TableConfigurations.parseTableConfig(table.properties()));
+ }
+
+ /** Test implementation of TableMaintainerContext. */
+ public static class Impl implements TableMaintainerContext {
+ private final TableConfiguration tableConfiguration;
+ private OptimizingInfo optimizingInfo;
+ private final MixedTable mixedTable;
+
+ public Impl(TableConfiguration tableConfiguration) {
+ this.tableConfiguration = tableConfiguration;
+ this.optimizingInfo = OptimizingInfo.EMPTY;
+ this.mixedTable = null;
+ }
+
+ public Impl(TableConfiguration tableConfiguration, MixedTable mixedTable) {
+ this.tableConfiguration = tableConfiguration;
+ this.optimizingInfo = OptimizingInfo.EMPTY;
+ this.mixedTable = mixedTable;
+ }
+
+ public void setOptimizingInfo(OptimizingInfo optimizingInfo) {
+ this.optimizingInfo = optimizingInfo;
+ }
+
+ @Override
+ public TableConfiguration getTableConfiguration() {
+ return tableConfiguration;
+ }
+
+ @Override
+ public MaintainerMetrics getMetrics() {
+ return MaintainerMetrics.NOOP;
+ }
+
+ @Override
+ public OptimizingInfo getOptimizingInfo() {
+ return optimizingInfo;
+ }
+
+ @Override
+ public Set<String> getHiveLocationPaths() {
+ // Use HiveLocationUtil to get Hive location paths
+ if (mixedTable == null) {
+ return Collections.emptySet();
+ }
+ return HiveLocationUtil.getHiveLocation(mixedTable);
+ }
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java
new file mode 100644
index 000000000..420c61e3b
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerMetrics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+/**
+ * Metrics collector interface for table maintenance operations.
Implementations can collect metrics
+ * to different monitoring systems.
+ */
+public interface MaintainerMetrics {
+
+ /**
+ * Record orphan data files cleaning result.
+ *
+ * @param expected expected number of files to clean
+ * @param cleaned actual number of files cleaned
+ */
+ void recordOrphanDataFilesCleaned(int expected, int cleaned);
+
+ /**
+ * Record orphan metadata files cleaning result.
+ *
+ * @param expected expected number of files to clean
+ * @param cleaned actual number of files cleaned
+ */
+ void recordOrphanMetadataFilesCleaned(int expected, int cleaned);
+
+ /** No-op implementation that does nothing. */
+ MaintainerMetrics NOOP =
+ new MaintainerMetrics() {
+ @Override
+ public void recordOrphanDataFilesCleaned(int expected, int cleaned) {}
+
+ @Override
+ public void recordOrphanMetadataFilesCleaned(int expected, int
cleaned) {}
+ };
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/maintainer/OptimizingInfo.java
b/amoro-common/src/main/java/org/apache/amoro/maintainer/OptimizingInfo.java
new file mode 100644
index 000000000..3056421ba
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/OptimizingInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+/**
+ * Optimizing process information interface. Provides information about
ongoing optimizing
+ * processes.
+ */
+public interface OptimizingInfo {
+
+ /**
+ * Check if there is an optimizing process currently processing.
+ *
+ * @return true if an optimizing process is in progress
+ */
+ boolean isProcessing();
+
+ /**
+ * Get the target snapshot id that the optimizing process is based on.
+ *
+ * @return target snapshot id, or {@code Long.MAX_VALUE} if no process is
running
+ */
+ long getTargetSnapshotId();
+
+ /** Empty implementation indicating no optimizing process. */
+ OptimizingInfo EMPTY =
+ new OptimizingInfo() {
+ @Override
+ public boolean isProcessing() {
+ return false;
+ }
+
+ @Override
+ public long getTargetSnapshotId() {
+ return Long.MAX_VALUE;
+ }
+ };
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainer.java
similarity index 92%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
rename to
amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainer.java
index 9474102b2..6719e1507 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/TableMaintainer.java
+++
b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.maintainer;
/**
* API for maintaining table.
@@ -24,7 +24,6 @@ package org.apache.amoro.server.optimizing.maintainer;
* <p>Includes: clean content files, clean metadata, clean dangling delete
files, expire snapshots,
* auto create tags.
*/
-// TODO TableMaintainer should not be in this optimizing.xxx package.
public interface TableMaintainer {
/** Clean table orphan files. Includes: data files, metadata files. */
diff --git
a/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainerContext.java
b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainerContext.java
new file mode 100644
index 000000000..4ebdf7360
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/maintainer/TableMaintainerContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import org.apache.amoro.config.TableConfiguration;
+
+import java.util.Set;
+
+/**
+ * Context interface for table maintainer operations.
+ *
+ * <p>Provides necessary configuration and runtime information for table
maintenance without
+ * depending on AMS-specific implementations.
+ */
+public interface TableMaintainerContext {
+
+ /**
+ * Get the table configuration.
+ *
+ * @return table configuration containing all maintenance-related settings
+ */
+ TableConfiguration getTableConfiguration();
+
+ /**
+ * Get the metrics collector for maintenance operations.
+ *
+ * @return metrics collector, may return NoopMaintainerMetrics if metrics
not supported
+ */
+ MaintainerMetrics getMetrics();
+
+ /**
+ * Get optimizing process information if available.
+ *
+ * @return optimizing information, may return EmptyOptimizingInfo if no
optimizing process
+ */
+ OptimizingInfo getOptimizingInfo();
+
+ /**
+ * Get Hive table/partition location paths if the table is a Hive-backed
table.
+ *
+ * <p>This is used to exclude Hive-managed files from being cleaned during
maintenance operations.
+ * For non-Hive tables, returns an empty set.
+ *
+ * @return set of Hive location paths, or empty set if not a Hive table
+ */
+ Set<String> getHiveLocationPaths();
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/AutoCreateIcebergTagAction.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java
similarity index 98%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/AutoCreateIcebergTagAction.java
rename to
amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java
index 51ef16702..0253b8791 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/AutoCreateIcebergTagAction.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/AutoCreateIcebergTagAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
import org.apache.amoro.config.TagConfiguration;
import org.apache.iceberg.ManageSnapshots;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
similarity index 88%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
rename to
amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
index 83382e672..3d1d4bfb1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
import static
org.apache.amoro.shade.guava32.com.google.common.primitives.Longs.min;
@@ -24,15 +24,16 @@ import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.config.TagConfiguration;
+import org.apache.amoro.formats.iceberg.utils.IcebergTableUtil;
+import org.apache.amoro.formats.iceberg.utils.RollingFileCleaner;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.io.PathInfo;
import org.apache.amoro.io.SupportsFileSystemOperations;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableConfigurations;
-import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
-import org.apache.amoro.server.utils.IcebergTableUtil;
-import org.apache.amoro.server.utils.RollingFileCleaner;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.OptimizingInfo;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainerContext;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
@@ -96,7 +97,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-/** Table maintainer for iceberg tables. */
+/** Table maintainer for Iceberg tables. */
public class IcebergTableMaintainer implements TableMaintainer {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergTableMaintainer.class);
@@ -118,22 +119,21 @@ public class IcebergTableMaintainer implements
TableMaintainer {
CommitMetaProducer.DATA_EXPIRATION.name(),
CommitMetaProducer.CLEAN_DANGLING_DELETE.name());
- protected Table table;
+ public Table table;
private final TableIdentifier tableIdentifier;
- private final DefaultTableRuntime tableRuntime;
+ private final TableMaintainerContext context;
public IcebergTableMaintainer(
- Table table, TableIdentifier tableIdentifier, DefaultTableRuntime
tableRuntime) {
+ Table table, TableIdentifier tableIdentifier, TableMaintainerContext
context) {
this.table = table;
this.tableIdentifier = tableIdentifier;
- this.tableRuntime = tableRuntime;
+ this.context = context;
}
@Override
public void cleanOrphanFiles() {
- TableConfiguration tableConfiguration =
tableRuntime.getTableConfiguration();
- TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics =
- tableRuntime.getOrphanFilesCleaningMetrics();
+ TableConfiguration tableConfiguration = context.getTableConfiguration();
+ MaintainerMetrics metrics = context.getMetrics();
if (!tableConfiguration.isCleanOrphanEnabled()) {
return;
@@ -141,18 +141,18 @@ public class IcebergTableMaintainer implements
TableMaintainer {
long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000;
- cleanContentFiles(System.currentTimeMillis() - keepTime,
orphanFilesCleaningMetrics);
+ cleanContentFiles(System.currentTimeMillis() - keepTime, metrics);
// refresh
table.refresh();
// clear metadata files
- cleanMetadata(System.currentTimeMillis() - keepTime,
orphanFilesCleaningMetrics);
+ cleanMetadata(System.currentTimeMillis() - keepTime, metrics);
}
@Override
public void cleanDanglingDeleteFiles() {
- TableConfiguration tableConfiguration =
tableRuntime.getTableConfiguration();
+ TableConfiguration tableConfiguration = context.getTableConfiguration();
if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) {
return;
}
@@ -175,21 +175,21 @@ public class IcebergTableMaintainer implements
TableMaintainer {
@Override
public void expireSnapshots() {
- if (!expireSnapshotEnabled(tableRuntime)) {
+ if (!expireSnapshotEnabled()) {
return;
}
expireSnapshots(
- mustOlderThan(tableRuntime, System.currentTimeMillis()),
- tableRuntime.getTableConfiguration().getSnapshotMinCount());
+ mustOlderThan(System.currentTimeMillis()),
+ context.getTableConfiguration().getSnapshotMinCount());
}
- protected boolean expireSnapshotEnabled(DefaultTableRuntime tableRuntime) {
- TableConfiguration tableConfiguration =
tableRuntime.getTableConfiguration();
+ public boolean expireSnapshotEnabled() {
+ TableConfiguration tableConfiguration = context.getTableConfiguration();
return tableConfiguration.isExpireSnapshotEnabled();
}
@VisibleForTesting
- void expireSnapshots(long mustOlderThan, int minCount) {
+ public void expireSnapshots(long mustOlderThan, int minCount) {
expireSnapshots(mustOlderThan, minCount,
expireSnapshotNeedToExcludeFiles());
}
@@ -229,11 +229,10 @@ public class IcebergTableMaintainer implements
TableMaintainer {
@Override
public void expireData() {
- DataExpirationConfig expirationConfig =
- tableRuntime.getTableConfiguration().getExpiringDataConfig();
+ DataExpirationConfig expirationConfig =
context.getTableConfiguration().getExpiringDataConfig();
try {
Types.NestedField field =
table.schema().findField(expirationConfig.getExpirationField());
- if (!TableConfigurations.isValidDataExpirationField(expirationConfig,
field, table.name())) {
+ if (!isValidDataExpirationField(expirationConfig, field, table.name())) {
return;
}
@@ -243,8 +242,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
}
- protected Instant expireBaseOnRule(
- DataExpirationConfig expirationConfig, Types.NestedField field) {
+ public Instant expireBaseOnRule(DataExpirationConfig expirationConfig,
Types.NestedField field) {
switch (expirationConfig.getBaseOnRule()) {
case CURRENT_TIME:
return Instant.now();
@@ -293,12 +291,11 @@ public class IcebergTableMaintainer implements
TableMaintainer {
@Override
public void autoCreateTags() {
- TagConfiguration tagConfiguration =
tableRuntime.getTableConfiguration().getTagConfiguration();
+ TagConfiguration tagConfiguration =
context.getTableConfiguration().getTagConfiguration();
new AutoCreateIcebergTagAction(table, tagConfiguration,
LocalDateTime.now()).execute();
}
- protected void cleanContentFiles(
- long lastTime, TableOrphanFilesCleaningMetrics
orphanFilesCleaningMetrics) {
+ public void cleanContentFiles(long lastTime, MaintainerMetrics metrics) {
// For clean data files, should getRuntime valid files in the base store
and the change store,
// so acquire in advance
// to prevent repeated acquisition
@@ -307,19 +304,18 @@ public class IcebergTableMaintainer implements
TableMaintainer {
"Starting cleaning orphan content files for table {} before {}",
table.name(),
Instant.ofEpochMilli(lastTime));
- clearInternalTableContentsFiles(lastTime, validFiles,
orphanFilesCleaningMetrics);
+ clearInternalTableContentsFiles(lastTime, validFiles, metrics);
}
- protected void cleanMetadata(
- long lastTime, TableOrphanFilesCleaningMetrics
orphanFilesCleaningMetrics) {
+ public void cleanMetadata(long lastTime, MaintainerMetrics metrics) {
LOG.info(
"Starting cleaning metadata files for table {} before {}",
table.name(),
Instant.ofEpochMilli(lastTime));
- clearInternalTableMetadata(lastTime, orphanFilesCleaningMetrics);
+ clearInternalTableMetadata(lastTime, metrics);
}
- protected void doCleanDanglingDeleteFiles() {
+ public void doCleanDanglingDeleteFiles() {
LOG.info("Starting cleaning dangling delete files for table {}",
table.name());
int danglingDeleteFilesCnt = clearInternalTableDanglingDeleteFiles();
runWithCondition(
@@ -331,18 +327,18 @@ public class IcebergTableMaintainer implements
TableMaintainer {
table.name()));
}
- protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
+ public long mustOlderThan(long now) {
long mustOlderThan =
min(
// The snapshots keep time
- now - snapshotsKeepTime(tableRuntime),
+ now - snapshotsKeepTime(),
// The snapshot optimizing plan based should not be expired for
committing
- fetchOptimizingPlanSnapshotTime(table, tableRuntime),
+ fetchOptimizingPlanSnapshotTime(table),
// The latest non-optimized snapshot should not be expired for
data expiring
fetchLatestNonOptimizedSnapshotTime(table));
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
- long flinkCkRetainMillis =
tableRuntime.getTableConfiguration().getFlinkCheckpointRetention();
+ long flinkCkRetainMillis =
context.getTableConfiguration().getFlinkCheckpointRetention();
if ((now - latestFlinkCommitTime) > flinkCkRetainMillis) {
// exceed configured flink checkpoint retain time, no need to consider
flink committed
// snapshot
@@ -353,15 +349,15 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
}
- protected long snapshotsKeepTime(DefaultTableRuntime tableRuntime) {
- return tableRuntime.getTableConfiguration().getSnapshotTTLMinutes() * 60 *
1000;
+ public long snapshotsKeepTime() {
+ return context.getTableConfiguration().getSnapshotTTLMinutes() * 60 * 1000;
}
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Collections.emptySet();
}
- protected Set<String> orphanFileCleanNeedToExcludeFiles() {
+ public Set<String> orphanFileCleanNeedToExcludeFiles() {
return Sets.union(
IcebergTableUtil.getAllContentFilePath(table),
IcebergTableUtil.getAllStatisticsFilePath(table));
@@ -372,9 +368,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
private void clearInternalTableContentsFiles(
- long lastTime,
- Set<String> exclude,
- TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics) {
+ long lastTime, Set<String> exclude, MaintainerMetrics metrics) {
String dataLocation = table.location() + File.separator + DATA_FOLDER_NAME;
int expected = 0, deleted = 0;
@@ -412,12 +406,11 @@ public class IcebergTableMaintainer implements
TableMaintainer {
finalDeleted,
finalExpected,
table.name());
- orphanFilesCleaningMetrics.completeOrphanDataFiles(finalExpected,
finalDeleted);
+ metrics.recordOrphanDataFilesCleaned(finalExpected, finalDeleted);
});
}
- private void clearInternalTableMetadata(
- long lastTime, TableOrphanFilesCleaningMetrics
orphanFilesCleaningMetrics) {
+ private void clearInternalTableMetadata(long lastTime, MaintainerMetrics
metrics) {
Set<String> validFiles = getValidMetadataFiles(table);
LOG.info("Found {} valid metadata files for table {}", validFiles.size(),
table.name());
Pattern excludeFileNameRegex = getExcludeFileNameRegex(table);
@@ -444,7 +437,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
deleted,
filesToDelete.size(),
table.name());
-
orphanFilesCleaningMetrics.completeOrphanMetadataFiles(filesToDelete.size(),
deleted);
+ metrics.recordOrphanMetadataFilesCleaned(filesToDelete.size(),
deleted);
});
} else {
LOG.warn(
@@ -496,14 +489,13 @@ public class IcebergTableMaintainer implements
TableMaintainer {
* process planned based. This snapshot will be used when optimizing process
committing.
*
* @param table table
- * @param tableRuntime table runtime
* @return time of snapshot for optimizing process planned based, return
Long.MAX_VALUE if no
* optimizing process exists
*/
- public static long fetchOptimizingPlanSnapshotTime(
- Table table, DefaultTableRuntime tableRuntime) {
- if (tableRuntime.getOptimizingStatus().isProcessing()) {
- long fromSnapshotId =
tableRuntime.getOptimizingProcess().getTargetSnapshotId();
+ public long fetchOptimizingPlanSnapshotTime(Table table) {
+ OptimizingInfo optimizingInfo = context.getOptimizingInfo();
+ if (optimizingInfo.isProcessing()) {
+ long fromSnapshotId = optimizingInfo.getTargetSnapshotId();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.snapshotId() == fromSnapshotId) {
@@ -656,7 +648,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
return filesToDelete;
}
- CloseableIterable<FileEntry> fileScan(
+ public CloseableIterable<FileEntry> fileScan(
Table table,
Expression dataFilter,
DataExpirationConfig expirationConfig,
@@ -706,7 +698,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
});
}
- protected ExpireFiles expiredFileScan(
+ public ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long
expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
@@ -789,6 +781,52 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
}
+ /**
+ * Check if the given field is valid for data expiration.
+ *
+ * @param config data expiration config
+ * @param field table nested field
+ * @param name table name
+ * @return true if field is valid
+ */
+ protected boolean isValidDataExpirationField(
+ DataExpirationConfig config, Types.NestedField field, String name) {
+ return config.isEnabled()
+ && config.getRetentionTime() > 0
+ && validateExpirationField(field, name, config.getExpirationField());
+ }
+
+ public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
+ Sets.newHashSet(
+ Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG,
Type.TypeID.DATE);
+
+ private static boolean validateExpirationField(
+ Types.NestedField field, String name, String expirationField) {
+ if (Strings.isNullOrEmpty(expirationField) || null == field) {
+ LOG.warn(
+ String.format(
+ "Field(%s) used to determine data expiration is illegal for
table(%s)",
+ expirationField, name));
+ return false;
+ }
+ Type.TypeID typeID = field.type().typeId();
+ if (!DATA_EXPIRATION_FIELD_TYPES.contains(typeID)) {
+ LOG.warn(
+ String.format(
+ "Table(%s) field(%s) type(%s) is not supported for data
expiration, please use the "
+ + "following types: %s",
+ name,
+ expirationField,
+ typeID.name(),
+ DATA_EXPIRATION_FIELD_TYPES.stream()
+ .map(Enum::name)
+ .collect(Collectors.joining(", "))));
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Create a filter expression for expired files for the `FILE` level. For
the `PARTITION` level,
* we need to collect the oldest files to determine if the partition is
obsolete, so we will not
@@ -797,7 +835,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
* @param expirationConfig expiration configuration
* @param expireTimestamp expired timestamp
*/
- protected static Expression getDataExpression(
+ public static Expression getDataExpression(
Schema schema, DataExpirationConfig expirationConfig, long
expireTimestamp) {
if
(expirationConfig.getExpirationLevel().equals(DataExpirationConfig.ExpireLevel.PARTITION))
{
return Expressions.alwaysTrue();
@@ -831,7 +869,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
}
- void expireFiles(ExpireFiles expiredFiles, long expireTimestamp) {
+ public void expireFiles(ExpireFiles expiredFiles, long expireTimestamp) {
long snapshotId = IcebergTableUtil.getSnapshotId(table, false);
Queue<DataFile> dataFiles = expiredFiles.dataFiles;
Queue<DeleteFile> deleteFiles = expiredFiles.deleteFiles;
@@ -871,16 +909,35 @@ public class IcebergTableMaintainer implements
TableMaintainer {
table.name());
}
+ public Table getTable() {
+ return table;
+ }
+
+ public static ZoneId getDefaultZoneId(Types.NestedField expireField) {
+ Type type = expireField.type();
+ if (type.typeId() == Type.TypeID.STRING) {
+ return ZoneId.systemDefault();
+ }
+ return ZoneOffset.UTC;
+ }
+
+ private void runWithCondition(boolean condition, Runnable fun) {
+ if (condition) {
+ fun.run();
+ }
+ }
+
+ /** Internal class for expiring files. */
public static class ExpireFiles {
Queue<DataFile> dataFiles;
Queue<DeleteFile> deleteFiles;
- ExpireFiles() {
+ public ExpireFiles() {
this.dataFiles = new LinkedTransferQueue<>();
this.deleteFiles = new LinkedTransferQueue<>();
}
- void addFile(FileEntry entry) {
+ public void addFile(FileEntry entry) {
ContentFile<?> file = entry.getFile();
switch (file.content()) {
case DATA:
@@ -896,6 +953,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
}
+ /** Internal class for tracking data file freshness. */
public static class DataFileFreshness {
long latestExpiredSeq;
long latestUpdateMillis;
@@ -928,7 +986,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
}
- static boolean mayExpired(
+ public static boolean mayExpired(
FileEntry fileEntry,
Map<StructLike, DataFileFreshness> partitionFreshness,
Long expireTimestamp) {
@@ -961,7 +1019,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
return expired;
}
- static boolean willNotRetain(
+ public static boolean willNotRetain(
FileEntry fileEntry,
DataExpirationConfig expirationConfig,
Map<StructLike, DataFileFreshness> partitionFreshness) {
@@ -1074,23 +1132,12 @@ public class IcebergTableMaintainer implements
TableMaintainer {
return !compareResults.isEmpty() &&
compareResults.stream().allMatch(Boolean::booleanValue);
}
- public Table getTable() {
- return table;
- }
-
- public static ZoneId getDefaultZoneId(Types.NestedField expireField) {
- Type type = expireField.type();
- if (type.typeId() == Type.TypeID.STRING) {
- return ZoneId.systemDefault();
- }
- return ZoneOffset.UTC;
- }
-
+ /** Internal class for file entry with expiration info. */
public static class FileEntry {
private final ContentFile<?> file;
private final Literal<Long> tsBound;
- FileEntry(ContentFile<?> file, Literal<Long> tsBound) {
+ public FileEntry(ContentFile<?> file, Literal<Long> tsBound) {
this.file = file;
this.tsBound = tsBound;
}
@@ -1103,10 +1150,4 @@ public class IcebergTableMaintainer implements
TableMaintainer {
return tsBound;
}
}
-
- private void runWithCondition(boolean condition, Runnable fun) {
- if (condition) {
- fun.run();
- }
- }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java
similarity index 80%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
rename to
amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java
index 987192f89..72e4a6429 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java
@@ -16,21 +16,20 @@
* limitations under the License.
*/
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
import static
org.apache.amoro.shade.guava32.com.google.common.primitives.Longs.min;
import static
org.apache.amoro.utils.MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;
import org.apache.amoro.IcebergFileEntry;
-import org.apache.amoro.TableFormat;
import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.data.FileNameRules;
+import org.apache.amoro.maintainer.MaintainerMetrics;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.maintainer.TableMaintainerContext;
import org.apache.amoro.scan.TableEntriesScan;
-import org.apache.amoro.server.table.DefaultTableRuntime;
-import org.apache.amoro.server.table.TableConfigurations;
-import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
-import org.apache.amoro.server.utils.HiveLocationUtil;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -54,6 +53,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
@@ -76,22 +76,25 @@ public class MixedTableMaintainer implements
TableMaintainer {
private static final Logger LOG =
LoggerFactory.getLogger(MixedTableMaintainer.class);
+ public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
+ Sets.newHashSet(
+ Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG,
Type.TypeID.DATE);
+
private final MixedTable mixedTable;
+ private final TableMaintainerContext context;
private ChangeTableMaintainer changeMaintainer;
-
private final BaseTableMaintainer baseMaintainer;
- private final DefaultTableRuntime tableRuntime;
- public MixedTableMaintainer(MixedTable mixedTable, DefaultTableRuntime
tableRuntime) {
+ public MixedTableMaintainer(MixedTable mixedTable, TableMaintainerContext
context) {
this.mixedTable = mixedTable;
- this.tableRuntime = tableRuntime;
+ this.context = context;
if (mixedTable.isKeyedTable()) {
changeMaintainer =
- new ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable(),
tableRuntime);
- baseMaintainer = new
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable(), tableRuntime);
+ new ChangeTableMaintainer(mixedTable.asKeyedTable().changeTable(),
context);
+ baseMaintainer = new
BaseTableMaintainer(mixedTable.asKeyedTable().baseTable(), context);
} else {
- baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable(),
tableRuntime);
+ baseMaintainer = new BaseTableMaintainer(mixedTable.asUnkeyedTable(),
context);
}
}
@@ -103,6 +106,11 @@ public class MixedTableMaintainer implements
TableMaintainer {
baseMaintainer.cleanOrphanFiles();
}
+ @Override
+ public void cleanDanglingDeleteFiles() {
+ // Mixed table doesn't support clean dangling delete files
+ }
+
@Override
public void expireSnapshots() {
if (changeMaintainer != null) {
@@ -121,13 +129,11 @@ public class MixedTableMaintainer implements
TableMaintainer {
@Override
public void expireData() {
- DataExpirationConfig expirationConfig =
- tableRuntime.getTableConfiguration().getExpiringDataConfig();
+ DataExpirationConfig expirationConfig =
context.getTableConfiguration().getExpiringDataConfig();
try {
Types.NestedField field =
mixedTable.schema().findField(expirationConfig.getExpirationField());
- if (!TableConfigurations.isValidDataExpirationField(
- expirationConfig, field, mixedTable.name())) {
+ if (!isValidDataExpirationField(expirationConfig, field,
mixedTable.name())) {
return;
}
@@ -137,7 +143,7 @@ public class MixedTableMaintainer implements
TableMaintainer {
}
}
- protected Instant expireMixedBaseOnRule(
+ public Instant expireMixedBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
Instant changeInstant =
Optional.ofNullable(changeMaintainer).isPresent()
@@ -250,23 +256,21 @@ public class MixedTableMaintainer implements
TableMaintainer {
throw new UnsupportedOperationException("Mixed table doesn't support auto
create tags");
}
- protected void cleanContentFiles(
- long lastTime, TableOrphanFilesCleaningMetrics
orphanFilesCleaningMetrics) {
+ public void cleanContentFiles(long lastTime, MaintainerMetrics metrics) {
if (changeMaintainer != null) {
- changeMaintainer.cleanContentFiles(lastTime, orphanFilesCleaningMetrics);
+ changeMaintainer.cleanContentFiles(lastTime, metrics);
}
- baseMaintainer.cleanContentFiles(lastTime, orphanFilesCleaningMetrics);
+ baseMaintainer.cleanContentFiles(lastTime, metrics);
}
- protected void cleanMetadata(
- long lastTime, TableOrphanFilesCleaningMetrics
orphanFilesCleaningMetrics) {
+ public void cleanMetadata(long lastTime, MaintainerMetrics metrics) {
if (changeMaintainer != null) {
- changeMaintainer.cleanMetadata(lastTime, orphanFilesCleaningMetrics);
+ changeMaintainer.cleanMetadata(lastTime, metrics);
}
- baseMaintainer.cleanMetadata(lastTime, orphanFilesCleaningMetrics);
+ baseMaintainer.cleanMetadata(lastTime, metrics);
}
- protected void doCleanDanglingDeleteFiles() {
+ public void doCleanDanglingDeleteFiles() {
if (changeMaintainer != null) {
changeMaintainer.doCleanDanglingDeleteFiles();
}
@@ -281,50 +285,89 @@ public class MixedTableMaintainer implements
TableMaintainer {
return baseMaintainer;
}
+ /**
+ * Check if the given field is valid for data expiration.
+ *
+ * @param config data expiration config
+ * @param field table nested field
+ * @param name table name
+ * @return true if field is valid
+ */
+ protected boolean isValidDataExpirationField(
+ DataExpirationConfig config, Types.NestedField field, String name) {
+ return config.isEnabled()
+ && config.getRetentionTime() > 0
+ && validateExpirationField(field, name, config.getExpirationField());
+ }
+
+ private static boolean validateExpirationField(
+ Types.NestedField field, String name, String expirationField) {
+ if (Strings.isNullOrEmpty(expirationField) || null == field) {
+ LOG.warn(
+ String.format(
+ "Field(%s) used to determine data expiration is illegal for
table(%s)",
+ expirationField, name));
+ return false;
+ }
+ Type.TypeID typeID = field.type().typeId();
+ if (!DATA_EXPIRATION_FIELD_TYPES.contains(typeID)) {
+ LOG.warn(
+ String.format(
+ "Table(%s) field(%s) type(%s) is not supported for data
expiration, please use the "
+ + "following types: %s",
+ name,
+ expirationField,
+ typeID.name(),
+ DATA_EXPIRATION_FIELD_TYPES.stream()
+ .map(Enum::name)
+ .collect(Collectors.joining(", "))));
+ return false;
+ }
+
+ return true;
+ }
+
+ /** Inner class for maintaining change table of mixed table. */
public class ChangeTableMaintainer extends IcebergTableMaintainer {
private static final int DATA_FILE_LIST_SPLIT = 3000;
private final UnkeyedTable unkeyedTable;
- public ChangeTableMaintainer(UnkeyedTable unkeyedTable,
DefaultTableRuntime tableRuntime) {
- super(unkeyedTable, mixedTable.id(), tableRuntime);
+ public ChangeTableMaintainer(UnkeyedTable unkeyedTable,
TableMaintainerContext context) {
+ super(unkeyedTable, mixedTable.id(), context);
this.unkeyedTable = unkeyedTable;
}
@Override
@VisibleForTesting
- void expireSnapshots(long mustOlderThan, int minCount) {
+ public void expireSnapshots(long mustOlderThan, int minCount) {
expireFiles(mustOlderThan);
super.expireSnapshots(mustOlderThan, minCount);
}
@Override
public void expireSnapshots() {
- if (!expireSnapshotEnabled(tableRuntime)) {
+ if (!expireSnapshotEnabled()) {
return;
}
long now = System.currentTimeMillis();
- expireFiles(now - snapshotsKeepTime(tableRuntime));
- expireSnapshots(
- mustOlderThan(tableRuntime, now),
- tableRuntime.getTableConfiguration().getSnapshotMinCount());
+ expireFiles(now - getSnapshotsKeepTime());
+ expireSnapshots(getMustOlderThan(now),
context.getTableConfiguration().getSnapshotMinCount());
}
- @Override
- protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
+ private long getSnapshotsKeepTime() {
+ return context.getTableConfiguration().getChangeDataTTLMinutes() * 60 *
1000;
+ }
+
+ private long getMustOlderThan(long now) {
return min(
// The change data ttl time
- now - snapshotsKeepTime(tableRuntime),
+ now - getSnapshotsKeepTime(),
// The latest flink committed snapshot should not be expired for
recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table));
}
- @Override
- protected long snapshotsKeepTime(DefaultTableRuntime tableRuntime) {
- return tableRuntime.getTableConfiguration().getChangeDataTTLMinutes() *
60 * 1000;
- }
-
public void expireFiles(long ttlPoint) {
List<IcebergFileEntry> expiredDataFileEntries =
getExpiredDataFileEntries(ttlPoint);
deleteChangeFile(expiredDataFileEntries);
@@ -440,14 +483,14 @@ public class MixedTableMaintainer implements
TableMaintainer {
}
}
+ /** Inner class for maintaining base table of mixed table. */
public class BaseTableMaintainer extends IcebergTableMaintainer {
- private final Set<String> hiveFiles = Sets.newHashSet();
+ private final Set<String> hiveFiles;
- public BaseTableMaintainer(UnkeyedTable unkeyedTable, DefaultTableRuntime
tableRuntime) {
- super(unkeyedTable, mixedTable.id(), tableRuntime);
- if (unkeyedTable.format() == TableFormat.MIXED_HIVE) {
- hiveFiles.addAll(HiveLocationUtil.getHiveLocation(mixedTable));
- }
+ public BaseTableMaintainer(UnkeyedTable unkeyedTable,
TableMaintainerContext context) {
+ super(unkeyedTable, mixedTable.id(), context);
+ // Get Hive location paths from context, no longer depends on
HiveLocationUtil
+ this.hiveFiles = context.getHiveLocationPaths();
}
@Override
@@ -457,14 +500,14 @@ public class MixedTableMaintainer implements
TableMaintainer {
}
@Override
- protected Set<String> expireSnapshotNeedToExcludeFiles() {
+ public Set<String> expireSnapshotNeedToExcludeFiles() {
return hiveFiles;
}
@Override
- protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
+ public long mustOlderThan(long now) {
DataExpirationConfig expiringDataConfig =
- tableRuntime.getTableConfiguration().getExpiringDataConfig();
+ context.getTableConfiguration().getExpiringDataConfig();
long dataExpiringSnapshotTime =
expiringDataConfig.isEnabled()
&& expiringDataConfig.getBaseOnRule()
@@ -474,9 +517,9 @@ public class MixedTableMaintainer implements
TableMaintainer {
return min(
// The snapshots keep time for base store
- now - snapshotsKeepTime(tableRuntime),
+ now - snapshotsKeepTime(),
// The snapshot optimizing plan based should not be expired for
committing
- fetchOptimizingPlanSnapshotTime(table, tableRuntime),
+ fetchOptimizingPlanSnapshotTime(table),
// The latest non-optimized snapshot should not be expired for data
expiring
dataExpiringSnapshotTime,
// The latest flink committed snapshot should not be expired for
recovering flink job
@@ -505,6 +548,7 @@ public class MixedTableMaintainer implements
TableMaintainer {
}
}
+ /** Entry wrapper for mixed table files with expiration information. */
public static class MixedFileEntry extends IcebergTableMaintainer.FileEntry {
private final boolean isChange;
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java
new file mode 100644
index 000000000..23a5b48a2
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/IcebergTableUtil.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.iceberg.utils;
+
+import org.apache.amoro.IcebergFileEntry;
+import org.apache.amoro.iceberg.Constants;
+import org.apache.amoro.scan.TableEntriesScan;
+import org.apache.amoro.shade.guava32.com.google.common.base.Predicate;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.utils.TableFileUtil;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.ReachableFileUtil;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.io.CloseableIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Util class for Iceberg table operations in format-iceberg module. */
+public class IcebergTableUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergTableUtil.class);
+
+ private IcebergTableUtil() {}
+
+ public static long getSnapshotId(Table table, boolean refresh) {
+ Snapshot currentSnapshot = getSnapshot(table, refresh);
+ if (currentSnapshot == null) {
+ return Constants.INVALID_SNAPSHOT_ID;
+ } else {
+ return currentSnapshot.snapshotId();
+ }
+ }
+
+ public static Snapshot getSnapshot(Table table, boolean refresh) {
+ if (refresh) {
+ table.refresh();
+ }
+ return table.currentSnapshot();
+ }
+
+ public static Optional<Snapshot> findFirstMatchSnapshot(
+ Table table, Predicate<Snapshot> predicate) {
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Collections.reverse(snapshots);
+ return Optional.ofNullable(Iterables.tryFind(snapshots,
predicate).orNull());
+ }
+
+ /**
+ * Find the latest optimizing snapshot in the table.
+ *
+ * @param table the Iceberg table
+ * @return Optional snapshot
+ */
+ public static Optional<Snapshot> findLatestOptimizingSnapshot(Table table) {
+ return IcebergTableUtil.findFirstMatchSnapshot(
+ table,
+ snapshot ->
+ snapshot.summary().containsValue("OPTIMIZE")
+ && DataOperations.REPLACE.equals(snapshot.operation()));
+ }
+
+ public static Set<String> getAllContentFilePath(Table internalTable) {
+ Set<String> validFilesPath = new HashSet<>();
+
+ TableEntriesScan entriesScan =
+ TableEntriesScan.builder(internalTable)
+ .includeFileContent(
+ FileContent.DATA, FileContent.POSITION_DELETES,
FileContent.EQUALITY_DELETES)
+ .allEntries()
+ .build();
+ try (CloseableIterable<IcebergFileEntry> entries = entriesScan.entries()) {
+ for (IcebergFileEntry entry : entries) {
+
validFilesPath.add(TableFileUtil.getUriPath(entry.getFile().path().toString()));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return validFilesPath;
+ }
+
+ public static Set<String> getAllStatisticsFilePath(Table table) {
+ return ReachableFileUtil.statisticsFilesLocations(table).stream()
+ .map(TableFileUtil::getUriPath)
+ .collect(Collectors.toSet());
+ }
+
+ public static Set<DeleteFile> getDanglingDeleteFiles(Table internalTable) {
+ if (internalTable.currentSnapshot() == null) {
+ return Collections.emptySet();
+ }
+ long snapshotId = internalTable.currentSnapshot().snapshotId();
+ Set<String> deleteFilesPath = new HashSet<>();
+ TableScan tableScan = internalTable.newScan().useSnapshot(snapshotId);
+ try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ for (FileScanTask fileScanTask : fileScanTasks) {
+ for (DeleteFile delete : fileScanTask.deletes()) {
+ deleteFilesPath.add(delete.path().toString());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("table scan plan files error", e);
+ return Collections.emptySet();
+ }
+
+ Set<DeleteFile> danglingDeleteFiles = new HashSet<>();
+ TableEntriesScan entriesScan =
+ TableEntriesScan.builder(internalTable)
+ .useSnapshot(snapshotId)
+ .includeFileContent(FileContent.EQUALITY_DELETES,
FileContent.POSITION_DELETES)
+ .build();
+ try (CloseableIterable<IcebergFileEntry> entries = entriesScan.entries()) {
+ for (IcebergFileEntry entry : entries) {
+ ContentFile<?> file = entry.getFile();
+ String path = file.path().toString();
+ if (!deleteFilesPath.contains(path)) {
+ danglingDeleteFiles.add((DeleteFile) file);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error when fetch iceberg entries", e);
+ }
+
+ return danglingDeleteFiles;
+ }
+
+ /**
+ * Fetch all manifest files of an Iceberg Table.
+ *
+ * @param table An iceberg table, or maybe base store or change store of
mixed-iceberg format.
+ * @return Path set of all valid manifest files.
+ */
+ public static Set<String> getAllManifestFiles(Table table) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+
+ Table allManifest =
+ MetadataTableUtils.createMetadataTableInstance(
+ ops,
+ table.name(),
+ table.name() + "#" + MetadataTableType.ALL_MANIFESTS.name(),
+ MetadataTableType.ALL_MANIFESTS);
+
+ Set<String> allManifestFiles =
+ Collections.newSetFromMap(new
java.util.concurrent.ConcurrentHashMap());
+ TableScan scan = allManifest.newScan().select("path");
+
+ CloseableIterable<FileScanTask> tasks = scan.planFiles();
+ CloseableIterable<CloseableIterable<StructLike>> transform =
+ CloseableIterable.transform(tasks, task -> task.asDataTask().rows());
+
+ try (CloseableIterable<StructLike> rows =
CloseableIterable.concat(transform)) {
+ rows.forEach(r -> allManifestFiles.add(r.get(0, String.class)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return allManifestFiles;
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/RollingFileCleaner.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/RollingFileCleaner.java
new file mode 100644
index 000000000..e453af95e
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/utils/RollingFileCleaner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.iceberg.utils;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.utils.TableFileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Rolling file cleaner for Iceberg table maintenance operations. */
+public class RollingFileCleaner {
+ private final Set<String> collectedFiles = Sets.newConcurrentHashSet();
+ private final Set<String> excludeFiles;
+ private final Set<String> parentDirectories = Sets.newConcurrentHashSet();
+
+ private static final int CLEANED_FILE_GROUP_SIZE = 1_000;
+
+ private final AtomicInteger fileCounter = new AtomicInteger(0);
+ private final AtomicInteger cleanedFileCounter = new AtomicInteger(0);
+
+ private final AuthenticatedFileIO fileIO;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RollingFileCleaner.class);
+
+ public RollingFileCleaner(AuthenticatedFileIO fileIO, Set<String>
excludeFiles) {
+ this.fileIO = fileIO;
+ this.excludeFiles =
+ excludeFiles != null
+ ? Sets.newConcurrentHashSet(excludeFiles)
+ : Sets.newConcurrentHashSet();
+ }
+
+ public void addFile(String filePath) {
+ if (isFileExcluded(filePath)) {
+ LOG.debug("File {} is excluded from cleaning", filePath);
+ return;
+ }
+
+ collectedFiles.add(filePath);
+ String parentDir = TableFileUtil.getParent(filePath);
+ parentDirectories.add(parentDir);
+ int currentCount = fileCounter.incrementAndGet();
+
+ if (currentCount % CLEANED_FILE_GROUP_SIZE == 0) {
+ doCleanFiles();
+ }
+ }
+
+ private boolean isFileExcluded(String filePath) {
+ if (excludeFiles.isEmpty()) {
+ return false;
+ }
+
+ if (excludeFiles.contains(filePath)) {
+ return true;
+ }
+
+ String uriPath = URI.create(filePath).getPath();
+ if (excludeFiles.contains(uriPath)) {
+ return true;
+ }
+
+ String parentPath = new Path(uriPath).getParent().toString();
+ return excludeFiles.contains(parentPath);
+ }
+
+ private void doCleanFiles() {
+ if (collectedFiles.isEmpty()) {
+ return;
+ }
+
+ if (fileIO.supportBulkOperations()) {
+ try {
+ fileIO.asBulkFileIO().deleteFiles(collectedFiles);
+ cleanedFileCounter.addAndGet(collectedFiles.size());
+ } catch (BulkDeletionFailureException e) {
+ LOG.warn("Failed to delete {} expired files in bulk",
e.numberFailedObjects());
+ }
+ } else {
+ for (String filePath : collectedFiles) {
+ try {
+ fileIO.deleteFile(filePath);
+ cleanedFileCounter.incrementAndGet();
+ } catch (Exception e) {
+ LOG.warn("Failed to delete expired file: {}", filePath, e);
+ }
+ }
+ }
+ // Try to delete empty parent directories
+ for (String parentDir : parentDirectories) {
+ TableFileUtil.deleteEmptyDirectory(fileIO, parentDir, excludeFiles);
+ }
+ parentDirectories.clear();
+
+ LOG.debug("Cleaned expired a file group, total files: {}",
collectedFiles.size());
+
+ collectedFiles.clear();
+ }
+
+ public int fileCount() {
+ return fileCounter.get();
+ }
+
+ public int cleanedFileCount() {
+ return cleanedFileCounter.get();
+ }
+
+ public void clear() {
+ if (!collectedFiles.isEmpty()) {
+ doCleanFiles();
+ }
+
+ collectedFiles.clear();
+ excludeFiles.clear();
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestAutoCreateIcebergTagAction.java
similarity index 85%
rename from
amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java
rename to
amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestAutoCreateIcebergTagAction.java
index 1469d0264..4eb637edd 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/formats/iceberg/maintainer/TestAutoCreateIcebergTagAction.java
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-package org.apache.amoro.server.optimizing.maintainer;
+package org.apache.amoro.formats.iceberg.maintainer;
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.TableFormat;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.TableTestBase;
import org.apache.amoro.config.TagConfiguration;
-import org.apache.amoro.server.table.TableConfigurations;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.table.TableProperties;
+import org.apache.amoro.utils.CompatiblePropertyUtil;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
@@ -40,6 +40,8 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+import java.util.Map;
public class TestAutoCreateIcebergTagAction extends TableTestBase {
@@ -57,7 +59,7 @@ public class TestAutoCreateIcebergTagAction extends
TableTestBase {
@NotNull
private AutoCreateIcebergTagAction newAutoCreateIcebergTagAction(Table
table, LocalDateTime now) {
- TagConfiguration tagConfig =
TableConfigurations.parseTagConfiguration(table.properties());
+ TagConfiguration tagConfig = parseTagConfiguration(table.properties());
return new AutoCreateIcebergTagAction(table, tagConfig, now);
}
@@ -347,6 +349,58 @@ public class TestAutoCreateIcebergTagAction extends
TableTestBase {
Assert.assertEquals(expectedTriggerTime, actualTriggerTime);
}
+ /**
+ * Parse tag configuration from table properties. This is a test helper
method that replicates the
+ * logic from TableConfigurations to avoid AMS dependency.
+ */
+ private TagConfiguration parseTagConfiguration(Map<String, String>
tableProperties) {
+ TagConfiguration tagConfig = new TagConfiguration();
+ tagConfig.setAutoCreateTag(
+ CompatiblePropertyUtil.propertyAsBoolean(
+ tableProperties,
+ TableProperties.ENABLE_AUTO_CREATE_TAG,
+ TableProperties.ENABLE_AUTO_CREATE_TAG_DEFAULT));
+ tagConfig.setTriggerPeriod(
+ TagConfiguration.Period.valueOf(
+ CompatiblePropertyUtil.propertyAsString(
+ tableProperties,
+ TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD,
+ TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD_DEFAULT)
+ .toUpperCase(Locale.ROOT)));
+
+ String defaultFormat;
+ switch (tagConfig.getTriggerPeriod()) {
+ case DAILY:
+ defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_DAILY_DEFAULT;
+ break;
+ case HOURLY:
+ defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_HOURLY_DEFAULT;
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported trigger period: " + tagConfig.getTriggerPeriod());
+ }
+ tagConfig.setTagFormat(
+ CompatiblePropertyUtil.propertyAsString(
+ tableProperties, TableProperties.AUTO_CREATE_TAG_FORMAT,
defaultFormat));
+ tagConfig.setTriggerOffsetMinutes(
+ CompatiblePropertyUtil.propertyAsInt(
+ tableProperties,
+ TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES,
+ TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES_DEFAULT));
+ tagConfig.setMaxDelayMinutes(
+ CompatiblePropertyUtil.propertyAsInt(
+ tableProperties,
+ TableProperties.AUTO_CREATE_TAG_MAX_DELAY_MINUTES,
+ TableProperties.AUTO_CREATE_TAG_MAX_DELAY_MINUTES_DEFAULT));
+ tagConfig.setTagMaxAgeMs(
+ CompatiblePropertyUtil.propertyAsLong(
+ tableProperties,
+ TableProperties.AUTO_CREATE_TAG_MAX_AGE_MS,
+ TableProperties.AUTO_CREATE_TAG_MAX_AGE_MS_DEFAULT));
+ return tagConfig;
+ }
+
private long getOffsetMinutesOfToday(long millis) {
LocalDateTime now = fromEpochMillis(millis);
LocalDateTime today = LocalDateTime.of(now.toLocalDate(),
LocalTime.ofSecondOfDay(0));