This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new eba34b158 [AMORO-3382]Refactor the policy strategy in Amoro's plan
module for a plugin-based transformation (#3397)
eba34b158 is described below
commit eba34b158f879b6a05425015451477040a33cf97
Author: zhangwl9 <[email protected]>
AuthorDate: Mon Jan 20 19:56:08 2025 +0800
[AMORO-3382]Refactor the policy strategy in Amoro's plan module for a
plugin-based transformation (#3397)
Optimize the SchedulingPolicy logic, and implement a plugin-based framework
for the sorter of the policy
Co-authored-by: 张文领 <[email protected]>
---
.../amoro/server/optimizing/SchedulingPolicy.java | 67 +++++++++++-----------
.../server/optimizing/sorter/BalancedSorter.java | 50 ++++++++++++++++
.../optimizing/sorter/QuotaOccupySorter.java | 49 ++++++++++++++++
.../server/optimizing/sorter/SorterFactory.java | 48 ++++++++++++++++
...he.amoro.server.optimizing.sorter.SorterFactory | 20 +++++++
5 files changed, 200 insertions(+), 34 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
index c62a188b6..31209062d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
@@ -21,27 +21,35 @@ package org.apache.amoro.server.optimizing;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter;
+import org.apache.amoro.server.optimizing.sorter.SorterFactory;
import org.apache.amoro.server.table.TableRuntime;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
+import java.util.ServiceLoader;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SchedulingPolicy {
+ public static final Logger LOG =
LoggerFactory.getLogger(SchedulingPolicy.class);
+
private static final String SCHEDULING_POLICY_PROPERTY_NAME =
"scheduling-policy";
- private static final String QUOTA = "quota";
- private static final String BALANCED = "balanced";
private final Map<ServerTableIdentifier, TableRuntime> tableRuntimeMap = new
HashMap<>();
private volatile String policyName;
private final Lock tableLock = new ReentrantLock();
+ private static final Map<String, SorterFactory> sorterFactoryCache = new
ConcurrentHashMap<>();
public SchedulingPolicy(ResourceGroup group) {
setTableSorterIfNeeded(group);
@@ -53,12 +61,26 @@ public class SchedulingPolicy {
policyName =
Optional.ofNullable(optimizerGroup.getProperties())
.orElseGet(Maps::newHashMap)
- .getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME, QUOTA);
+ .getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME,
QuotaOccupySorter.IDENTIFIER);
} finally {
tableLock.unlock();
}
}
+ static {
+ ServiceLoader<SorterFactory> sorterFactories =
ServiceLoader.load(SorterFactory.class);
+ Iterator<SorterFactory> iterator = sorterFactories.iterator();
+ iterator.forEachRemaining(
+ sorterFactory -> {
+ String identifier = sorterFactory.getIdentifier();
+ sorterFactoryCache.put(identifier, sorterFactory);
+ LOG.info(
+ "Loaded scheduling policy {} and its corresponding sorter
instance {}",
+ identifier,
+ sorterFactory.getClass().getName());
+ });
+ }
+
public String name() {
return policyName;
}
@@ -77,12 +99,15 @@ public class SchedulingPolicy {
}
private Comparator<TableRuntime> createSorterByPolicy() {
- if (policyName.equalsIgnoreCase(QUOTA)) {
- return new QuotaOccupySorter();
- } else if (policyName.equalsIgnoreCase(BALANCED)) {
- return new BalancedSorter();
+ if (sorterFactoryCache.get(policyName) != null) {
+ SorterFactory sorterFactory = sorterFactoryCache.get(policyName);
+ LOG.info(
+ "Using sorter instance {} corresponding to the scheduling policy {}",
+ sorterFactory.getClass().getName(),
+ policyName);
+ return sorterFactory.createComparator();
} else {
- throw new IllegalArgumentException("Illegal scheduling policy: " +
policyName);
+ throw new IllegalArgumentException("Unsupported scheduling policy: " +
policyName);
}
}
@@ -136,30 +161,4 @@ public class SchedulingPolicy {
Map<ServerTableIdentifier, TableRuntime> getTableRuntimeMap() {
return tableRuntimeMap;
}
-
- private static class QuotaOccupySorter implements Comparator<TableRuntime> {
-
- private final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap();
-
- @Override
- public int compare(TableRuntime one, TableRuntime another) {
- return Double.compare(
- tableWeightMap.computeIfAbsent(one,
TableRuntime::calculateQuotaOccupy),
- tableWeightMap.computeIfAbsent(another,
TableRuntime::calculateQuotaOccupy));
- }
- }
-
- private static class BalancedSorter implements Comparator<TableRuntime> {
- @Override
- public int compare(TableRuntime one, TableRuntime another) {
- return Long.compare(
- Math.max(
- one.getLastFullOptimizingTime(),
- Math.max(one.getLastMinorOptimizingTime(),
one.getLastMajorOptimizingTime())),
- Math.max(
- another.getLastFullOptimizingTime(),
- Math.max(
- another.getLastMinorOptimizingTime(),
another.getLastMajorOptimizingTime())));
- }
- }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java
new file mode 100644
index 000000000..17963fee0
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.sorter;
+
+import org.apache.amoro.server.table.TableRuntime;
+
+import java.util.Comparator;
+
+public class BalancedSorter implements SorterFactory {
+
+ private static final String IDENTIFIER = "balanced";
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Comparator<TableRuntime> createComparator() {
+ return new Comparator<TableRuntime>() {
+ @Override
+ public int compare(TableRuntime one, TableRuntime another) {
+ return Long.compare(
+ Math.max(
+ one.getLastFullOptimizingTime(),
+ Math.max(one.getLastMinorOptimizingTime(),
one.getLastMajorOptimizingTime())),
+ Math.max(
+ another.getLastFullOptimizingTime(),
+ Math.max(
+ another.getLastMinorOptimizingTime(),
another.getLastMajorOptimizingTime())));
+ }
+ };
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java
new file mode 100644
index 000000000..27af5104d
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.sorter;
+
+import org.apache.amoro.server.table.TableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+
+import java.util.Comparator;
+import java.util.Map;
+
+public class QuotaOccupySorter implements SorterFactory {
+
+ public static final String IDENTIFIER = "quota";
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Comparator<TableRuntime> createComparator() {
+ return new Comparator<TableRuntime>() {
+ final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap();
+
+ @Override
+ public int compare(TableRuntime one, TableRuntime another) {
+ return Double.compare(
+ tableWeightMap.computeIfAbsent(one,
TableRuntime::calculateQuotaOccupy),
+ tableWeightMap.computeIfAbsent(another,
TableRuntime::calculateQuotaOccupy));
+ }
+ };
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java
new file mode 100644
index 000000000..4cc66c506
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.sorter;
+
+import org.apache.amoro.server.optimizing.SchedulingPolicy;
+
+import java.util.Comparator;
+
+/**
+ * A factory for sorter. Sorter instantiates a comparator, which is
automatically loaded by the
+ * {@link SchedulingPolicy} as a plugin, as long as the sorter is constructed
and the {@link
+ *
/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory}
file
+ * contains the full qualified class name for the sorter. The comparator sorts
the tableRuntimes
+ * based on the parameters of each tableRuntime in the input tableRuntimeList,
and determines the
+ * scheduling priority for optimization of each tableRuntime.
+ */
+public interface SorterFactory {
+
+ /**
+ * Returns a globally unique identifier for the sorter instance, such as
Balanced、balanced and
+ * BALANCED represent the different sorter instance. The {@link
SchedulingPolicy} will check the
+ * values of {@link SchedulingPolicy#policyName} and {@link
SorterFactory#getIdentifier} to decide
+ * which sorter instance to use. If sorter instances sorterA and sorterB
have the same identifier
+ * id1, the {@link SchedulingPolicy} loads sorter instances in the order of
their loading, first
+ * loading sorterA, then sorterB. Ultimately, sorterB will replace sorterA
as the final sorter
+ * instance associated with id1, which is stored in {@link
SchedulingPolicy#sorterFactoryCache}.
+ */
+ String getIdentifier();
+
+ /** Create a comparator for sorter. */
+ Comparator createComparator();
+}
diff --git
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory
new file mode 100644
index 000000000..ce802cc88
--- /dev/null
+++
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# *
+# http://www.apache.org/licenses/LICENSE-2.0
+# *
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter
+org.apache.amoro.server.optimizing.sorter.BalancedSorter
\ No newline at end of file