This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new bce2d6417 [AMORO-3440] Implement the refresh and exploration of
OptimizingGroup… (#3457)
bce2d6417 is described below
commit bce2d6417b61d51a3b39f80b8348acc8b3718850
Author: mansonliwh <[email protected]>
AuthorDate: Fri Mar 21 11:24:25 2025 +0800
[AMORO-3440] Implement the refresh and exploration of OptimizingGroup…
(#3457)
[AMORO-3440] Implement the refresh and exploration of OptimizingGroupConfig
---
.../apache/amoro/server/AmoroManagementConf.java | 6 ++
.../apache/amoro/server/AmoroServiceContainer.java | 5 +-
.../amoro/server/DefaultOptimizingService.java | 104 +++++++++++----------
.../dashboard/controller/OptimizerController.java | 2 +-
.../controller/OptimizerGroupController.java | 12 +--
.../amoro/server/optimizing/OptimizingQueue.java | 4 +
.../server/resource/DefaultOptimizerManager.java | 48 +++++++++-
.../amoro/server/resource/OptimizerManager.java | 2 +
.../apache/amoro/server/AMSManagerTestBase.java | 2 +-
.../apache/amoro/server/AMSServiceTestBase.java | 6 +-
.../src/test/resources/config-with-units.yaml | 1 +
.../org/apache/amoro/resource/ResourceGroup.java | 20 ++++
dist/src/main/amoro-bin/conf/config.yaml | 1 +
13 files changed, 148 insertions(+), 65 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 63504e748..6103c30d8 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -380,6 +380,12 @@ public class AmoroManagementConf {
.defaultValue(Duration.ofSeconds(3))
.withDescription("Optimizer polling task timeout.");
+ public static final ConfigOption<Duration> OPTIMIZING_REFRESH_GROUP_INTERVAL
=
+ ConfigOptions.key("self-optimizing.refresh-group-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription("Optimizer group refresh interval.");
+
/** config key prefix of terminal */
public static final String TERMINAL_PREFIX = "terminal.";
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 899b434fd..283d211dd 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -157,13 +157,12 @@ public class AmoroServiceContainer {
catalogManager = new DefaultCatalogManager(serviceConfig);
tableManager = new DefaultTableManager(serviceConfig, catalogManager);
- optimizerManager = new DefaultOptimizerManager(serviceConfig);
+ optimizerManager = new DefaultOptimizerManager(serviceConfig,
catalogManager);
tableService = new DefaultTableService(serviceConfig, catalogManager);
optimizingService =
- new DefaultOptimizingService(
- serviceConfig, catalogManager, tableManager, optimizerManager,
tableService);
+ new DefaultOptimizingService(serviceConfig, catalogManager,
optimizerManager, tableService);
LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 9cfb1c21b..77cbddf74 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -20,8 +20,6 @@ package org.apache.amoro.server;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.OptimizerProperties;
-import org.apache.amoro.ServerTableIdentifier;
-import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.OptimizerRegisterInfo;
import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.api.OptimizingTask;
@@ -34,7 +32,6 @@ import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.exception.TaskNotFoundException;
-import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingProcess;
@@ -50,14 +47,13 @@ import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
-import org.apache.amoro.server.table.MaintainedTableManager;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
-import org.apache.amoro.table.TableProperties;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@@ -74,6 +70,7 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -96,34 +93,35 @@ public class DefaultOptimizingService extends
StatedPersistentBase
private final long taskAckTimeout;
private final int maxPlanningParallelism;
private final long pollingTimeout;
+ private final long refreshGroupInterval;
private final Map<String, OptimizingQueue> optimizingQueueByGroup = new
ConcurrentHashMap<>();
private final Map<String, OptimizingQueue> optimizingQueueByToken = new
ConcurrentHashMap<>();
private final Map<String, OptimizerInstance> authOptimizers = new
ConcurrentHashMap<>();
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
+ private final OptimizingConfigWatcher optimizingConfigWatcher = new
OptimizingConfigWatcher();
private final CatalogManager catalogManager;
private final OptimizerManager optimizerManager;
private final TableService tableService;
- private final MaintainedTableManager tableManager;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;
public DefaultOptimizingService(
Configurations serviceConfig,
CatalogManager catalogManager,
- MaintainedTableManager tableManager,
OptimizerManager optimizerManager,
TableService tableService) {
this.optimizerTouchTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
this.taskAckTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
+ this.refreshGroupInterval =
+
serviceConfig.get(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL).toMillis();
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
this.tableService = tableService;
this.catalogManager = catalogManager;
- this.tableManager = tableManager;
this.optimizerManager = optimizerManager;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
@@ -318,7 +316,6 @@ public class DefaultOptimizingService extends
StatedPersistentBase
.orElseThrow(() -> new PluginRetryAuthException("Optimizer has not
been authenticated"));
}
- // TODO need to use optimizer manager
public void deleteOptimizer(String group, String resourceId) {
List<OptimizerInstance> deleteOptimizers =
getAs(OptimizerMapper.class, mapper ->
mapper.selectByResourceId(resourceId));
@@ -332,7 +329,6 @@ public class DefaultOptimizingService extends
StatedPersistentBase
public void createResourceGroup(ResourceGroup resourceGroup) {
doAsTransaction(
() -> {
- optimizerManager.createResourceGroup(resourceGroup);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
catalogManager,
@@ -346,23 +342,13 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
public void deleteResourceGroup(String groupName) {
- if (canDeleteResourceGroup(groupName)) {
- optimizerManager.deleteResourceGroup(groupName);
- OptimizingQueue optimizingQueue =
optimizingQueueByGroup.remove(groupName);
- optimizingQueue.dispose();
- } else {
- throw new RuntimeException(
- String.format(
- "The resource group %s cannot be deleted because it is currently
in " + "use.",
- groupName));
- }
+ OptimizingQueue optimizingQueue = optimizingQueueByGroup.remove(groupName);
+ optimizingQueue.dispose();
}
public void updateResourceGroup(ResourceGroup resourceGroup) {
- Preconditions.checkNotNull(resourceGroup, "The resource group cannot be
null.");
Optional.ofNullable(optimizingQueueByGroup.get(resourceGroup.getName()))
.ifPresent(queue -> queue.updateOptimizerGroup(resourceGroup));
- optimizerManager.updateResourceGroup(resourceGroup);
}
public void dispose() {
@@ -372,33 +358,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
optimizingQueueByToken.clear();
authOptimizers.clear();
planExecutor.shutdown();
- }
-
- public boolean canDeleteResourceGroup(String name) {
- for (CatalogMeta catalogMeta : catalogManager.listCatalogMetas()) {
- if (catalogMeta.getCatalogProperties() != null
- && catalogMeta
- .getCatalogProperties()
- .getOrDefault(
- CatalogMetaProperties.TABLE_PROPERTIES_PREFIX
- + TableProperties.SELF_OPTIMIZING_GROUP,
- TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT)
- .equals(name)) {
- return false;
- }
- }
- for (OptimizerInstance optimizer : optimizerManager.listOptimizers()) {
- if (optimizer.getGroupName().equals(name)) {
- return false;
- }
- }
- for (ServerTableIdentifier identifier : tableManager.listManagedTables()) {
- if (optimizingQueueByGroup.containsKey(name)
- && optimizingQueueByGroup.get(name).containsTable(identifier)) {
- return false;
- }
- }
- return true;
+ optimizingConfigWatcher.dispose();
}
@Override
@@ -446,6 +406,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeList);
optimizerKeeper.start();
+ optimizingConfigWatcher.start();
LOG.info("SuspendingDetector for Optimizer has been started.");
LOG.info("OptimizerManagementService initializing has completed");
}
@@ -571,4 +532,49 @@ public class DefaultOptimizingService extends
StatedPersistentBase
&& task.getStartTime() + taskAckTimeout <
System.currentTimeMillis();
}
}
+
+ private class OptimizingConfigWatcher implements Runnable {
+ private final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("resource-group-watcher-%d").build());
+
+ void start() {
+ run();
+ scheduler.scheduleAtFixedRate(
+ this, refreshGroupInterval, refreshGroupInterval,
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run() {
+ syncGroups();
+ }
+
+ private void syncGroups() {
+ try {
+ List<ResourceGroup> resourceGroups =
optimizerManager.listResourceGroups();
+ Set<String> groupNames =
+
resourceGroups.stream().map(ResourceGroup::getName).collect(Collectors.toSet());
+ Sets.difference(optimizingQueueByGroup.keySet(), groupNames)
+ .forEach(DefaultOptimizingService.this::deleteResourceGroup);
+ resourceGroups.forEach(
+ resourceGroup -> {
+ if (optimizingQueueByGroup.containsKey(resourceGroup.getName())
+ && !optimizingQueueByGroup
+ .get(resourceGroup.getName())
+ .getOptimizerGroup()
+ .equals(resourceGroup)) {
+ updateResourceGroup(resourceGroup);
+ } else {
+ createResourceGroup(resourceGroup);
+ }
+ });
+ } catch (Throwable t) {
+ LOG.error("Sync optimizer groups failed, will retry later.", t);
+ }
+ }
+
+ void dispose() {
+ scheduler.shutdown();
+ }
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java
index 55259c5a0..d5cdc832f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java
@@ -69,7 +69,7 @@ public class OptimizerController {
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
optimizerManager.deleteResource(resourceId);
- optimizingService.deleteOptimizer(resource.getGroupName(), resourceId);
+ optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
ctx.json(OkResponse.of("Success to release optimizer"));
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java
index d39ba3409..c579387c9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java
@@ -59,7 +59,6 @@ public class OptimizerGroupController {
private static final String ALL_GROUP = "all";
private final TableManager tableManager;
- private final DefaultOptimizingService optimizingService;
private final OptimizerManager optimizerManager;
private static final Pattern GROUP_NAME_PATTERN =
Pattern.compile("^[A-Za-z0-9_-]{1,50}$");
@@ -68,7 +67,6 @@ public class OptimizerGroupController {
DefaultOptimizingService optimizingService,
OptimizerManager optimizerManager) {
this.tableManager = tableManager;
- this.optimizingService = optimizingService;
this.optimizerManager = optimizerManager;
}
@@ -213,7 +211,7 @@ public class OptimizerGroupController {
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
optimizerManager.deleteResource(resourceId);
- optimizingService.deleteOptimizer(resource.getGroupName(), resourceId);
+ optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
ctx.json(OkResponse.of("Success to release optimizer"));
}
@@ -269,7 +267,7 @@ public class OptimizerGroupController {
validateGroupName(name);
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
- optimizingService.createResourceGroup(builder.build());
+ optimizerManager.createResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully
created."));
}
@@ -284,21 +282,21 @@ public class OptimizerGroupController {
Map<String, String> properties = (Map) map.get("properties");
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
- optimizingService.updateResourceGroup(builder.build());
+ optimizerManager.updateResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully
updated."));
}
/** delete optimizeGroup url = /optimize/resourceGroups/{resourceGroupName}
*/
public void deleteResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
- optimizingService.deleteResourceGroup(name);
+ optimizerManager.deleteResourceGroup(name);
ctx.json(OkResponse.of("The optimizer group has been successfully
deleted."));
}
/** check if optimizerGroup can be deleted url =
/optimize/resourceGroups/delete/check */
public void deleteCheckResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
- ctx.json(OkResponse.of(optimizingService.canDeleteResourceGroup(name)));
+ ctx.json(OkResponse.of(optimizerManager.canDeleteResourceGroup(name)));
}
/** check if optimizerGroup can be deleted url = /optimize/containers/get */
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 2c8640d05..3a840570f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -334,6 +334,10 @@ public class OptimizingQueue extends PersistentBase {
retryTaskQueue.offer(taskRuntime);
}
+ public ResourceGroup getOptimizerGroup() {
+ return optimizerGroup;
+ }
+
public void updateOptimizerGroup(ResourceGroup optimizerGroup) {
Preconditions.checkArgument(
this.optimizerGroup.getName().equals(optimizerGroup.getName()),
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java
index 4a3d89cbc..c1f435214 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java
@@ -18,12 +18,19 @@
package org.apache.amoro.server.resource;
+import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.config.Configurations;
+import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.table.TableProperties;
import java.util.List;
import java.util.stream.Collectors;
@@ -40,8 +47,11 @@ import java.util.stream.Collectors;
public class DefaultOptimizerManager extends PersistentBase implements
OptimizerManager {
protected final Configurations serverConfiguration;
+ private final CatalogManager catalogManager;
- public DefaultOptimizerManager(Configurations serverConfiguration) {
+ public DefaultOptimizerManager(
+ Configurations serverConfiguration, CatalogManager catalogManager) {
+ this.catalogManager = catalogManager;
this.serverConfiguration = serverConfiguration;
}
@@ -81,11 +91,19 @@ public class DefaultOptimizerManager extends PersistentBase
implements Optimizer
@Override
public void deleteResourceGroup(String groupName) {
- doAs(ResourceMapper.class, mapper ->
mapper.deleteResourceGroup(groupName));
+ if (canDeleteResourceGroup(groupName)) {
+ doAs(ResourceMapper.class, mapper ->
mapper.deleteResourceGroup(groupName));
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "The resource group %s cannot be deleted because it is currently
in use.",
+ groupName));
+ }
}
@Override
public void updateResourceGroup(ResourceGroup resourceGroup) {
+ Preconditions.checkNotNull(resourceGroup, "The resource group cannot be
null.");
doAs(ResourceMapper.class, mapper ->
mapper.updateResourceGroup(resourceGroup));
}
@@ -125,4 +143,30 @@ public class DefaultOptimizerManager extends
PersistentBase implements Optimizer
public Resource getResource(String resourceId) {
return getAs(ResourceMapper.class, mapper ->
mapper.selectResource(resourceId));
}
+
+ @Override
+ public boolean canDeleteResourceGroup(String name) {
+ for (CatalogMeta catalogMeta : catalogManager.listCatalogMetas()) {
+ if (catalogMeta.getCatalogProperties() != null
+ && catalogMeta
+ .getCatalogProperties()
+ .getOrDefault(
+ CatalogMetaProperties.TABLE_PROPERTIES_PREFIX
+ + TableProperties.SELF_OPTIMIZING_GROUP,
+ TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT)
+ .equals(name)) {
+ return false;
+ }
+ }
+ for (OptimizerInstance optimizer : listOptimizers()) {
+ if (optimizer.getGroupName().equals(name)) {
+ return false;
+ }
+ }
+ List<TableRuntimeMeta> tableRuntimeMetas =
+ getAs(
+ TableMetaMapper.class,
+ mapper -> mapper.selectTableRuntimesForOptimizerGroup(name, null,
null, null));
+ return tableRuntimeMetas.isEmpty();
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java
index c7a1daf3c..a194ac256 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java
@@ -28,4 +28,6 @@ public interface OptimizerManager extends ResourceManager {
List<OptimizerInstance> listOptimizers(String groupName);
void deleteOptimizer(String groupName, String resourceId);
+
+ boolean canDeleteResourceGroup(String name);
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java
index d12ba1607..0cb42451c 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java
@@ -47,7 +47,7 @@ public abstract class AMSManagerTestBase {
Configurations configurations = new Configurations();
CATALOG_MANAGER = new DefaultCatalogManager(configurations);
TABLE_MANAGER = new DefaultTableManager(configurations, CATALOG_MANAGER);
- OPTIMIZER_MANAGER = new DefaultOptimizerManager(configurations);
+ OPTIMIZER_MANAGER = new DefaultOptimizerManager(configurations,
CATALOG_MANAGER);
} catch (Throwable throwable) {
Assert.fail(throwable.getMessage());
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index ae3ad604e..6cf8c6bda 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -41,11 +41,13 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
TABLE_SERVICE = new DefaultTableService(new Configurations(),
CATALOG_MANAGER);
OPTIMIZING_SERVICE =
new DefaultOptimizingService(
- configurations, CATALOG_MANAGER, TABLE_MANAGER,
OPTIMIZER_MANAGER, TABLE_SERVICE);
+ configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER,
TABLE_SERVICE);
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());
TABLE_SERVICE.initialize();
try {
- OPTIMIZING_SERVICE.createResourceGroup(defaultResourceGroup());
+ ResourceGroup group = defaultResourceGroup();
+ OPTIMIZER_MANAGER.createResourceGroup(group);
+ OPTIMIZING_SERVICE.createResourceGroup(group);
} catch (Throwable ignored) {
}
} catch (Throwable throwable) {
diff --git a/amoro-ams/src/test/resources/config-with-units.yaml
b/amoro-ams/src/test/resources/config-with-units.yaml
index c01751c78..40f24cf4c 100644
--- a/amoro-ams/src/test/resources/config-with-units.yaml
+++ b/amoro-ams/src/test/resources/config-with-units.yaml
@@ -50,6 +50,7 @@ ams:
commit-thread-count: 10
runtime-data-keep-days: 30
runtime-data-expire-interval-hours: 1
+ refresh-group-interval: 30s
optimizer:
heart-beat-timeout: 1min
diff --git
a/amoro-common/src/main/java/org/apache/amoro/resource/ResourceGroup.java
b/amoro-common/src/main/java/org/apache/amoro/resource/ResourceGroup.java
index 24a1583b7..a7f6a62de 100644
--- a/amoro-common/src/main/java/org/apache/amoro/resource/ResourceGroup.java
+++ b/amoro-common/src/main/java/org/apache/amoro/resource/ResourceGroup.java
@@ -23,6 +23,7 @@ import
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
public class ResourceGroup {
private String name;
@@ -96,4 +97,23 @@ public class ResourceGroup {
return resourceGroup;
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ResourceGroup that = (ResourceGroup) o;
+ return Objects.equals(name, that.name)
+ && Objects.equals(container, that.container)
+ && Objects.equals(properties, that.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, container, properties);
+ }
}
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index d9461617b..b51aa985f 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -50,6 +50,7 @@ ams:
commit-thread-count: 10
runtime-data-keep-days: 30
runtime-data-expire-interval-hours: 1
+ refresh-group-interval: 30s
optimizer:
heart-beat-timeout: 1min # 60000