This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new db33a52b0 [AMORO-3316]: Decouple planer with table runtime (#3318)
db33a52b0 is described below
commit db33a52b0f9fe66f259bbea84c2e03cee39f63cf
Author: baiyangtx <[email protected]>
AuthorDate: Mon Nov 4 11:38:53 2024 +0800
[AMORO-3316]: Decouple planer with table runtime (#3318)
* Remove table-runtime from planner and evaluator
* Fix Compile error
* Fix UT Compile error
* fix ut compile error
* spotless
* ut error
---
.../amoro/server/optimizing/OptimizingQueue.java | 8 +-
.../optimizing/plan/AbstractPartitionPlan.java | 27 +++---
.../optimizing/plan/CommonPartitionEvaluator.java | 33 ++++---
.../optimizing/plan/IcebergPartitionPlan.java | 19 +++-
.../optimizing/plan/MixedHivePartitionPlan.java | 46 +++++++--
.../optimizing/plan/MixedIcebergPartitionPlan.java | 38 ++++++--
.../optimizing/plan/OptimizingEvaluator.java | 61 +++++++++---
.../server/optimizing/plan/OptimizingPlanner.java | 105 ++++++++++++++++++---
.../executor/TableRuntimeRefreshExecutor.java | 2 +-
.../optimizing/flow/CompleteOptimizingFlow.java | 2 +-
.../plan/TestHiveKeyedPartitionPlan.java | 7 +-
.../plan/TestHiveUnkeyedPartitionPlan.java | 7 +-
.../optimizing/plan/TestIcebergPartitionPlan.java | 10 +-
.../optimizing/plan/TestKeyedPartitionPlan.java | 8 +-
.../optimizing/plan/TestOptimizingEvaluator.java | 12 ++-
.../optimizing/plan/TestOptimizingPlanner.java | 2 +-
.../optimizing/plan/TestUnkeyedPartitionPlan.java | 8 +-
17 files changed, 306 insertions(+), 89 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 710ec729d..c024908a7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -264,13 +264,13 @@ public class OptimizingQueue extends PersistentBase {
try {
AmoroTable<?> table =
tableManager.loadTable(tableRuntime.getTableIdentifier());
OptimizingPlanner planner =
- new OptimizingPlanner(
+ OptimizingPlanner.createOptimizingPlanner(
tableRuntime.refresh(table),
(MixedTable) table.originalTable(),
getAvailableCore(),
maxInputSizePerThread());
if (planner.isNecessary()) {
- return new TableOptimizingProcess(planner);
+ return new TableOptimizingProcess(planner, tableRuntime);
} else {
tableRuntime.completeEmptyProcess();
return null;
@@ -371,9 +371,9 @@ public class OptimizingQueue extends PersistentBase {
}
}
- public TableOptimizingProcess(OptimizingPlanner planner) {
+ public TableOptimizingProcess(OptimizingPlanner planner, TableRuntime
tableRuntime) {
processId = planner.getProcessId();
- tableRuntime = planner.getTableRuntime();
+ this.tableRuntime = tableRuntime;
optimizingType = planner.getOptimizingType();
planTime = planner.getPlanTime();
targetSnapshotId = planner.getTargetSnapshotId();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
index 262270281..27cebc6d0 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java
@@ -18,12 +18,12 @@
package org.apache.amoro.server.optimizing.plan;
+import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.RewriteStageTask;
-import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -48,7 +48,9 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
protected final Pair<Integer, StructLike> partition;
protected final OptimizingConfig config;
- protected final TableRuntime tableRuntime;
+ protected final ServerTableIdentifier identifier;
+ protected final long lastMinorOptimizingTime;
+ protected final long lastFullOptimizingTime;
private CommonPartitionEvaluator evaluator;
private TaskSplitter taskSplitter;
protected MixedTable tableObject;
@@ -74,15 +76,20 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
protected final Set<String> reservedDeleteFiles = Sets.newHashSet();
public AbstractPartitionPlan(
- TableRuntime tableRuntime,
+ ServerTableIdentifier identifier,
MixedTable table,
+ OptimizingConfig config,
Pair<Integer, StructLike> partition,
- long planTime) {
+ long planTime,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ this.identifier = identifier;
this.partition = partition;
this.tableObject = table;
- this.config = tableRuntime.getOptimizingConfig();
- this.tableRuntime = tableRuntime;
+ this.config = config;
this.planTime = planTime;
+ this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+ this.lastFullOptimizingTime = lastFullOptimizingTime;
}
@Override
@@ -98,7 +105,8 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
}
protected CommonPartitionEvaluator buildEvaluator() {
- return new CommonPartitionEvaluator(tableRuntime, partition, planTime);
+ return new CommonPartitionEvaluator(
+ identifier, config, partition, planTime, lastMinorOptimizingTime,
lastFullOptimizingTime);
}
@Override
@@ -317,10 +325,7 @@ public abstract class AbstractPartitionPlan implements
PartitionEvaluator {
MixedTableUtil.getMixedTablePartitionSpecById(tableObject,
partition.first());
String partitionPath = spec.partitionToPath(partition.second());
return new RewriteStageTask(
- tableRuntime.getTableIdentifier().getId(),
- partitionPath,
- input,
- properties.getProperties());
+ identifier.getId(), partitionPath, input,
properties.getProperties());
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
index 8038e9761..1c48718a7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java
@@ -18,9 +18,9 @@
package org.apache.amoro.server.optimizing.plan;
+import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.server.optimizing.OptimizingType;
-import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -40,10 +40,12 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
private static final Logger LOG =
LoggerFactory.getLogger(CommonPartitionEvaluator.class);
private final Set<String> deleteFileSet = Sets.newHashSet();
- protected final TableRuntime tableRuntime;
private final Pair<Integer, StructLike> partition;
+ protected final ServerTableIdentifier identifier;
protected final OptimizingConfig config;
+ protected final long lastFullOptimizingTime;
+ protected final long lastMinorOptimizingTime;
protected final long fragmentSize;
protected final long minTargetSize;
protected final long planTime;
@@ -83,10 +85,15 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
private String name;
public CommonPartitionEvaluator(
- TableRuntime tableRuntime, Pair<Integer, StructLike> partition, long
planTime) {
+ ServerTableIdentifier identifier,
+ OptimizingConfig config,
+ Pair<Integer, StructLike> partition,
+ long planTime,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ this.identifier = identifier;
+ this.config = config;
this.partition = partition;
- this.tableRuntime = tableRuntime;
- this.config = tableRuntime.getOptimizingConfig();
this.fragmentSize = config.getTargetSize() / config.getFragmentRatio();
this.minTargetSize = (long) (config.getTargetSize() *
config.getMinTargetSizeRatio());
if (minTargetSize > config.getTargetSize() - fragmentSize) {
@@ -95,10 +102,11 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
+ "the another merge file.");
}
this.planTime = planTime;
+ this.lastMinorOptimizingTime = lastMinorOptimizingTime;
+ this.lastFullOptimizingTime = lastFullOptimizingTime;
this.reachFullInterval =
config.getFullTriggerInterval() >= 0
- && planTime - tableRuntime.getLastFullOptimizingTime()
- > config.getFullTriggerInterval();
+ && planTime - lastFullOptimizingTime >
config.getFullTriggerInterval();
}
@Override
@@ -343,7 +351,7 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
protected boolean reachMinorInterval() {
return config.getMinorLeastInterval() >= 0
- && planTime - tableRuntime.getLastMinorOptimizingTime() >
config.getMinorLeastInterval();
+ && planTime - lastMinorOptimizingTime > config.getMinorLeastInterval();
}
protected boolean reachFullInterval() {
@@ -363,9 +371,7 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
protected String name() {
if (name == null) {
- name =
- String.format(
- "partition %s of %s", partition,
tableRuntime.getTableIdentifier().toString());
+ name = String.format("partition %s of %s", partition,
identifier.toString());
}
return name;
}
@@ -509,9 +515,8 @@ public class CommonPartitionEvaluator implements
PartitionEvaluator {
.add("fragmentSize", fragmentSize)
.add("undersizedSegmentSize", minTargetSize)
.add("planTime", planTime)
- .add("lastMinorOptimizeTime",
tableRuntime.getLastMinorOptimizingTime())
- .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime())
- .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime())
+ .add("lastMinorOptimizeTime", lastMinorOptimizingTime)
+ .add("lastFullOptimizeTime", lastFullOptimizingTime)
.add("fragmentFileCount", fragmentFileCount)
.add("fragmentFileSize", fragmentFileSize)
.add("fragmentFileRecords", fragmentFileRecords)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
index 590efa190..239584f85 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java
@@ -18,9 +18,10 @@
package org.apache.amoro.server.optimizing.plan;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
import org.apache.amoro.optimizing.OptimizingInputProperties;
-import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.Pair;
@@ -31,11 +32,21 @@ import java.util.stream.Collectors;
public class IcebergPartitionPlan extends AbstractPartitionPlan {
protected IcebergPartitionPlan(
- TableRuntime tableRuntime,
+ ServerTableIdentifier identifier,
+ OptimizingConfig config,
MixedTable table,
Pair<Integer, StructLike> partition,
- long planTime) {
- super(tableRuntime, table, partition, planTime);
+ long planTime,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ super(
+ identifier,
+ table,
+ config,
+ partition,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
}
@Override
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
index dba9672e1..a3b9af7bc 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java
@@ -18,12 +18,13 @@
package org.apache.amoro.server.optimizing.plan;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.properties.HiveTableProperties;
-import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.ContentFile;
@@ -40,12 +41,22 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
private String customHiveSubdirectory;
public MixedHivePartitionPlan(
- TableRuntime tableRuntime,
+ ServerTableIdentifier identifier,
MixedTable table,
+ OptimizingConfig config,
Pair<Integer, StructLike> partition,
String hiveLocation,
- long planTime) {
- super(tableRuntime, table, partition, planTime);
+ long planTime,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ super(
+ identifier,
+ table,
+ config,
+ partition,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
this.hiveLocation = hiveLocation;
}
@@ -89,7 +100,15 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
@Override
protected CommonPartitionEvaluator buildEvaluator() {
return new MixedHivePartitionEvaluator(
- tableRuntime, partition, partitionProperties, hiveLocation, planTime,
isKeyedTable());
+ identifier,
+ config,
+ partition,
+ partitionProperties,
+ hiveLocation,
+ planTime,
+ isKeyedTable(),
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
}
@Override
@@ -121,13 +140,24 @@ public class MixedHivePartitionPlan extends
MixedIcebergPartitionPlan {
private boolean filesNotInHiveLocation = false;
public MixedHivePartitionEvaluator(
- TableRuntime tableRuntime,
+ ServerTableIdentifier identifier,
+ OptimizingConfig config,
Pair<Integer, StructLike> partition,
Map<String, String> partitionProperties,
String hiveLocation,
long planTime,
- boolean keyedTable) {
- super(tableRuntime, partition, partitionProperties, planTime,
keyedTable);
+ boolean keyedTable,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ super(
+ identifier,
+ config,
+ partition,
+ partitionProperties,
+ planTime,
+ keyedTable,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
this.hiveLocation = hiveLocation;
String optimizedTime =
partitionProperties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
index 9892b3d92..282214fff 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java
@@ -18,12 +18,13 @@
package org.apache.amoro.server.optimizing.plan;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.DataTreeNode;
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.hive.optimizing.MixFormatRewriteExecutorFactory;
import org.apache.amoro.optimizing.OptimizingInputProperties;
-import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -49,11 +50,21 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
protected final Map<String, String> partitionProperties;
public MixedIcebergPartitionPlan(
- TableRuntime tableRuntime,
+ ServerTableIdentifier identifier,
MixedTable table,
+ OptimizingConfig config,
Pair<Integer, StructLike> partition,
- long planTime) {
- super(tableRuntime, table, partition, planTime);
+ long planTime,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ super(
+ identifier,
+ table,
+ config,
+ partition,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
this.partitionProperties = TablePropertyUtil.getPartitionProperties(table,
partition.second());
}
@@ -101,7 +112,14 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
@Override
protected CommonPartitionEvaluator buildEvaluator() {
return new MixedIcebergPartitionEvaluator(
- tableRuntime, partition, partitionProperties, planTime,
isKeyedTable());
+ identifier,
+ config,
+ partition,
+ partitionProperties,
+ planTime,
+ isKeyedTable(),
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
}
protected static class MixedIcebergPartitionEvaluator extends
CommonPartitionEvaluator {
@@ -110,12 +128,16 @@ public class MixedIcebergPartitionPlan extends
AbstractPartitionPlan {
private final boolean reachBaseRefreshInterval;
public MixedIcebergPartitionEvaluator(
- TableRuntime tableRuntime,
+ ServerTableIdentifier identifier,
+ OptimizingConfig config,
Pair<Integer, StructLike> partition,
Map<String, String> partitionProperties,
long planTime,
- boolean keyedTable) {
- super(tableRuntime, partition, planTime);
+ boolean keyedTable,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ super(
+ identifier, config, partition, planTime, lastMinorOptimizingTime,
lastFullOptimizingTime);
this.keyedTable = keyedTable;
String optimizedTime =
partitionProperties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME);
long lastBaseOptimizedTime = optimizedTime == null ? 0 :
Long.parseLong(optimizedTime);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
index f404592fc..5914d57bb 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java
@@ -18,7 +18,9 @@
package org.apache.amoro.server.optimizing.plan;
+import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.server.optimizing.scan.IcebergTableFileScanHelper;
@@ -59,25 +61,44 @@ public class OptimizingEvaluator {
private static final Logger LOG =
LoggerFactory.getLogger(OptimizingEvaluator.class);
+ protected final ServerTableIdentifier identifier;
+ protected final OptimizingConfig config;
protected final MixedTable mixedTable;
- protected final TableRuntime tableRuntime;
protected final TableSnapshot currentSnapshot;
+ protected final long lastFullOptimizingTime;
+ protected final long lastMinorOptimizingTime;
protected final int maxPendingPartitions;
protected boolean isInitialized = false;
-
protected Map<String, PartitionEvaluator> needOptimizingPlanMap =
Maps.newHashMap();
protected Map<String, PartitionEvaluator> partitionPlanMap =
Maps.newHashMap();
- public OptimizingEvaluator(
+ public static OptimizingEvaluator createOptimizingEvaluator(
TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
- this.tableRuntime = tableRuntime;
- this.mixedTable = table;
- this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
- this.maxPendingPartitions = maxPendingPartitions;
+ return new OptimizingEvaluator(
+ tableRuntime.getTableIdentifier(),
+ tableRuntime.getOptimizingConfig(),
+ table,
+ IcebergTableUtil.getSnapshot(table, tableRuntime),
+ maxPendingPartitions,
+ tableRuntime.getLastMinorOptimizingTime(),
+ tableRuntime.getLastFullOptimizingTime());
}
- public TableRuntime getTableRuntime() {
- return tableRuntime;
+ public OptimizingEvaluator(
+ ServerTableIdentifier identifier,
+ OptimizingConfig config,
+ MixedTable table,
+ TableSnapshot currentSnapshot,
+ int maxPendingPartitions,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ this.identifier = identifier;
+ this.config = config;
+ this.mixedTable = table;
+ this.currentSnapshot = currentSnapshot;
+ this.maxPendingPartitions = maxPendingPartitions;
+ this.lastFullOptimizingTime = lastFullOptimizingTime;
+ this.lastMinorOptimizingTime = lastMinorOptimizingTime;
}
protected void initEvaluator() {
@@ -150,25 +171,37 @@ public class OptimizingEvaluator {
protected PartitionEvaluator buildEvaluator(Pair<Integer, StructLike>
partition) {
if (TableFormat.ICEBERG.equals(mixedTable.format())) {
- return new CommonPartitionEvaluator(tableRuntime, partition,
System.currentTimeMillis());
+ return new CommonPartitionEvaluator(
+ identifier,
+ config,
+ partition,
+ System.currentTimeMillis(),
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
} else {
Map<String, String> partitionProperties = partitionProperties(partition);
if (TableTypeUtil.isHive(mixedTable)) {
String hiveLocation = (((SupportHive) mixedTable).hiveLocation());
return new MixedHivePartitionPlan.MixedHivePartitionEvaluator(
- tableRuntime,
+ identifier,
+ config,
partition,
partitionProperties,
hiveLocation,
System.currentTimeMillis(),
- mixedTable.isKeyedTable());
+ mixedTable.isKeyedTable(),
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
} else {
return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator(
- tableRuntime,
+ identifier,
+ config,
partition,
partitionProperties,
System.currentTimeMillis(),
- mixedTable.isKeyedTable());
+ mixedTable.isKeyedTable(),
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
}
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
index cf6fc9e71..13bb8f1d7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java
@@ -18,7 +18,9 @@
package org.apache.amoro.server.optimizing.plan;
+import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.server.AmoroServiceConstants;
@@ -26,6 +28,8 @@ import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.table.KeyedTableSnapshot;
import org.apache.amoro.server.table.TableRuntime;
+import org.apache.amoro.server.table.TableSnapshot;
+import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.ExpressionUtil;
@@ -49,24 +53,21 @@ public class OptimizingPlanner extends OptimizingEvaluator {
private static final Logger LOG =
LoggerFactory.getLogger(OptimizingPlanner.class);
private final Expression partitionFilter;
-
protected long processId;
private final double availableCore;
private final long planTime;
private OptimizingType optimizingType;
private final PartitionPlannerFactory partitionPlannerFactory;
private List<RewriteStageTask> tasks;
-
private List<AbstractPartitionPlan> actualPartitionPlans;
private final long maxInputSizePerThread;
- public OptimizingPlanner(
+ public static OptimizingPlanner createOptimizingPlanner(
TableRuntime tableRuntime,
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
- super(tableRuntime, table, Integer.MAX_VALUE);
- this.partitionFilter =
+ Expression partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
:
tableRuntime.getPendingInput().getPartitions().entrySet().stream()
@@ -76,10 +77,52 @@ public class OptimizingPlanner extends OptimizingEvaluator {
table, entry.getKey(), entry.getValue()))
.reduce(Expressions::or)
.orElse(Expressions.alwaysTrue());
+ long planTime = System.currentTimeMillis();
+
+ return new OptimizingPlanner(
+ tableRuntime.getTableIdentifier(),
+ tableRuntime.getOptimizingConfig(),
+ table,
+ IcebergTableUtil.getSnapshot(table, tableRuntime),
+ partitionFilter,
+ Math.max(tableRuntime.getNewestProcessId() + 1, planTime),
+ availableCore,
+ maxInputSizePerThread,
+ tableRuntime.getLastMinorOptimizingTime(),
+ tableRuntime.getLastFullOptimizingTime());
+ }
+
+ public OptimizingPlanner(
+ ServerTableIdentifier identifier,
+ OptimizingConfig config,
+ MixedTable table,
+ TableSnapshot snapshot,
+ Expression partitionFilter,
+ long processId,
+ double availableCore,
+ long maxInputSizePerThread,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ super(
+ identifier,
+ config,
+ table,
+ snapshot,
+ Integer.MAX_VALUE,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
+ this.partitionFilter = partitionFilter;
this.availableCore = availableCore;
this.planTime = System.currentTimeMillis();
- this.processId = Math.max(tableRuntime.getNewestProcessId() + 1, planTime);
- this.partitionPlannerFactory = new PartitionPlannerFactory(mixedTable,
tableRuntime, planTime);
+ this.processId = processId;
+ this.partitionPlannerFactory =
+ new PartitionPlannerFactory(
+ identifier,
+ config,
+ mixedTable,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
this.maxInputSizePerThread = maxInputSizePerThread;
}
@@ -150,7 +193,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
initEvaluator();
}
if (!super.isNecessary()) {
- LOG.debug("Table {} skip planning", tableRuntime.getTableIdentifier());
+ LOG.debug("Table {} skip planning", identifier);
return cacheAndReturnTasks(Collections.emptyList());
}
@@ -188,7 +231,7 @@ public class OptimizingPlanner extends OptimizingEvaluator {
long endTime = System.nanoTime();
LOG.info(
"{} finish plan, type = {}, get {} tasks, cost {} ns, {} ms
maxInputSize {} actualInputSize {}",
- tableRuntime.getTableIdentifier(),
+ identifier,
getOptimizingType(),
tasks.size(),
endTime - startTime,
@@ -216,16 +259,27 @@ public class OptimizingPlanner extends
OptimizingEvaluator {
}
private static class PartitionPlannerFactory {
+ protected final OptimizingConfig config;
+ protected final ServerTableIdentifier identifier;
+ protected final long lastMinorOptimizingTime;
+ protected final long lastFullOptimizingTime;
private final MixedTable mixedTable;
- private final TableRuntime tableRuntime;
private final String hiveLocation;
private final long planTime;
public PartitionPlannerFactory(
- MixedTable mixedTable, TableRuntime tableRuntime, long planTime) {
+ ServerTableIdentifier identifier,
+ OptimizingConfig config,
+ MixedTable mixedTable,
+ long planTime,
+ long lastMinorOptimizingTime,
+ long lastFullOptimizingTime) {
+ this.identifier = identifier;
+ this.config = config;
this.mixedTable = mixedTable;
- this.tableRuntime = tableRuntime;
this.planTime = planTime;
+ this.lastFullOptimizingTime = lastFullOptimizingTime;
+ this.lastMinorOptimizingTime = lastMinorOptimizingTime;
if (TableTypeUtil.isHive(mixedTable)) {
this.hiveLocation = (((SupportHive) mixedTable).hiveLocation());
} else {
@@ -235,13 +289,34 @@ public class OptimizingPlanner extends
OptimizingEvaluator {
public PartitionEvaluator buildPartitionPlanner(Pair<Integer, StructLike>
partition) {
if (TableFormat.ICEBERG.equals(mixedTable.format())) {
- return new IcebergPartitionPlan(tableRuntime, mixedTable, partition,
planTime);
+ return new IcebergPartitionPlan(
+ identifier,
+ config,
+ mixedTable,
+ partition,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
} else {
if (TableTypeUtil.isHive(mixedTable)) {
return new MixedHivePartitionPlan(
- tableRuntime, mixedTable, partition, hiveLocation, planTime);
+ identifier,
+ mixedTable,
+ config,
+ partition,
+ hiveLocation,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
} else {
- return new MixedIcebergPartitionPlan(tableRuntime, mixedTable,
partition, planTime);
+ return new MixedIcebergPartitionPlan(
+ identifier,
+ mixedTable,
+ config,
+ partition,
+ planTime,
+ lastMinorOptimizingTime,
+ lastFullOptimizingTime);
}
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index b290323ab..36261a09b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -53,7 +53,7 @@ public class TableRuntimeRefreshExecutor extends
BaseTableExecutor {
private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable
table) {
if (tableRuntime.isOptimizingEnabled() &&
!tableRuntime.getOptimizingStatus().isProcessing()) {
OptimizingEvaluator evaluator =
- new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
+ OptimizingEvaluator.createOptimizingEvaluator(tableRuntime, table,
maxPendingPartitions);
if (evaluator.isNecessary()) {
OptimizingEvaluator.PendingInput pendingInput =
evaluator.getOptimizingPendingInput();
logger.debug(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
index 41765bf5a..0a8ed6474 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java
@@ -192,7 +192,7 @@ public class CompleteOptimizingFlow {
Mockito.when(tableRuntime.getOptimizingConfig()).thenAnswer(f ->
optimizingConfig());
Mockito.when(tableRuntime.getTableIdentifier())
.thenReturn(ServerTableIdentifier.of(1L, "a", "b", "c",
table.format()));
- return new OptimizingPlanner(
+ return OptimizingPlanner.createOptimizingPlanner(
tableRuntime,
table,
availableCore,
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
index 82ee12a7c..b1cbfbd67 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java
@@ -81,11 +81,14 @@ public class TestHiveKeyedPartitionPlan extends
TestKeyedPartitionPlan {
SupportHive hiveTable = (SupportHive) getMixedTable();
String hiveLocation = hiveTable.hiveLocation();
return new MixedHivePartitionPlan(
- getTableRuntime(),
+ getTableRuntime().getTableIdentifier(),
getMixedTable(),
+ getTableRuntime().getOptimizingConfig(),
getPartition(),
hiveLocation,
- System.currentTimeMillis());
+ System.currentTimeMillis(),
+ getTableRuntime().getLastMinorOptimizingTime(),
+ getTableRuntime().getLastFullOptimizingTime());
}
@Test
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
index 6109486e7..23c2039f2 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java
@@ -80,11 +80,14 @@ public class TestHiveUnkeyedPartitionPlan extends
TestUnkeyedPartitionPlan {
SupportHive hiveTable = (SupportHive) getMixedTable();
String hiveLocation = hiveTable.hiveLocation();
return new MixedHivePartitionPlan(
- getTableRuntime(),
+ getTableRuntime().getTableIdentifier(),
getMixedTable(),
+ getTableRuntime().getOptimizingConfig(),
getPartition(),
hiveLocation,
- System.currentTimeMillis());
+ System.currentTimeMillis(),
+ getTableRuntime().getLastMinorOptimizingTime(),
+ getTableRuntime().getLastFullOptimizingTime());
}
@Test
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
index 857935e2b..98ae4a4fc 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java
@@ -27,6 +27,7 @@ import
org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.server.optimizing.scan.IcebergTableFileScanHelper;
import org.apache.amoro.server.optimizing.scan.TableFileScanHelper;
+import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.junit.Test;
@@ -72,8 +73,15 @@ public class TestIcebergPartitionPlan extends
TestUnkeyedPartitionPlan {
@Override
protected AbstractPartitionPlan getPartitionPlan() {
+ TableRuntime tableRuntime = getTableRuntime();
return new IcebergPartitionPlan(
- getTableRuntime(), getMixedTable(), getPartition(),
System.currentTimeMillis());
+ tableRuntime.getTableIdentifier(),
+ tableRuntime.getOptimizingConfig(),
+ getMixedTable(),
+ getPartition(),
+ System.currentTimeMillis(),
+ tableRuntime.getLastMinorOptimizingTime(),
+ tableRuntime.getLastFullOptimizingTime());
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
index 6a169d7db..3b7509faf 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java
@@ -210,7 +210,13 @@ public class TestKeyedPartitionPlan extends
MixedTablePlanTestBase {
@Override
protected AbstractPartitionPlan getPartitionPlan() {
return new MixedIcebergPartitionPlan(
- getTableRuntime(), getMixedTable(), getPartition(),
System.currentTimeMillis());
+ getTableRuntime().getTableIdentifier(),
+ getMixedTable(),
+ getTableRuntime().getOptimizingConfig(),
+ getPartition(),
+ System.currentTimeMillis(),
+ getTableRuntime().getLastMinorOptimizingTime(),
+ getTableRuntime().getLastFullOptimizingTime());
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
index 395aa653d..920de8f64 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java
@@ -27,6 +27,8 @@ import
org.apache.amoro.server.optimizing.OptimizingTestHelpers;
import org.apache.amoro.server.optimizing.scan.KeyedTableFileScanHelper;
import org.apache.amoro.server.optimizing.scan.TableFileScanHelper;
import org.apache.amoro.server.optimizing.scan.UnkeyedTableFileScanHelper;
+import org.apache.amoro.server.table.TableSnapshot;
+import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -110,7 +112,15 @@ public class TestOptimizingEvaluator extends
MixedTablePlanTestBase {
}
protected OptimizingEvaluator buildOptimizingEvaluator() {
- return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
+ TableSnapshot snapshot = IcebergTableUtil.getSnapshot(getMixedTable(),
tableRuntime);
+ return new OptimizingEvaluator(
+ tableRuntime.getTableIdentifier(),
+ tableRuntime.getOptimizingConfig(),
+ getMixedTable(),
+ snapshot,
+ 100,
+ tableRuntime.getLastMinorOptimizingTime(),
+ tableRuntime.getLastFullOptimizingTime());
}
protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
index b49061608..ae5545f6e 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java
@@ -96,7 +96,7 @@ public class TestOptimizingPlanner extends
TestOptimizingEvaluator {
@Override
protected OptimizingPlanner buildOptimizingEvaluator() {
- return new OptimizingPlanner(
+ return OptimizingPlanner.createOptimizingPlanner(
getTableRuntime(),
getMixedTable(),
1,
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
index 6224bd24e..c240cce26 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java
@@ -75,7 +75,13 @@ public class TestUnkeyedPartitionPlan extends
MixedTablePlanTestBase {
@Override
protected AbstractPartitionPlan getPartitionPlan() {
return new MixedIcebergPartitionPlan(
- getTableRuntime(), getMixedTable(), getPartition(),
System.currentTimeMillis());
+ getTableRuntime().getTableIdentifier(),
+ getMixedTable(),
+ getTableRuntime().getOptimizingConfig(),
+ getPartition(),
+ System.currentTimeMillis(),
+ getTableRuntime().getLastMinorOptimizingTime(),
+ getTableRuntime().getLastFullOptimizingTime());
}
@Override