This is an automated email from the ASF dual-hosted git repository. jinsongzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push: new eba34b158 [AMORO-3382]Refactor the policy strategy in Amoro's plan module for a plugin-based transformation (#3397) eba34b158 is described below commit eba34b158f879b6a05425015451477040a33cf97 Author: zhangwl9 <1298877...@qq.com> AuthorDate: Mon Jan 20 19:56:08 2025 +0800 [AMORO-3382]Refactor the policy strategy in Amoro's plan module for a plugin-based transformation (#3397) Optimize the SchedulingPolicy logic, and implement a plugin-based framework for the sorter of the policy Co-authored-by: 张文领 <zhang...@chinatelecom.cn> --- .../amoro/server/optimizing/SchedulingPolicy.java | 67 +++++++++++----------- .../server/optimizing/sorter/BalancedSorter.java | 50 ++++++++++++++++ .../optimizing/sorter/QuotaOccupySorter.java | 49 ++++++++++++++++ .../server/optimizing/sorter/SorterFactory.java | 48 ++++++++++++++++ ...he.amoro.server.optimizing.sorter.SorterFactory | 20 +++++++ 5 files changed, 200 insertions(+), 34 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java index c62a188b6..31209062d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java @@ -21,27 +21,35 @@ package org.apache.amoro.server.optimizing; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.api.BlockableOperation; import org.apache.amoro.resource.ResourceGroup; +import org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter; +import org.apache.amoro.server.optimizing.sorter.SorterFactory; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class SchedulingPolicy { + public static final Logger LOG = LoggerFactory.getLogger(SchedulingPolicy.class); + private static final String SCHEDULING_POLICY_PROPERTY_NAME = "scheduling-policy"; - private static final String QUOTA = "quota"; - private static final String BALANCED = "balanced"; private final Map<ServerTableIdentifier, TableRuntime> tableRuntimeMap = new HashMap<>(); private volatile String policyName; private final Lock tableLock = new ReentrantLock(); + private static final Map<String, SorterFactory> sorterFactoryCache = new ConcurrentHashMap<>(); public SchedulingPolicy(ResourceGroup group) { setTableSorterIfNeeded(group); @@ -53,12 +61,26 @@ public class SchedulingPolicy { policyName = Optional.ofNullable(optimizerGroup.getProperties()) .orElseGet(Maps::newHashMap) - .getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME, QUOTA); + .getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME, QuotaOccupySorter.IDENTIFIER); } finally { tableLock.unlock(); } } + static { + ServiceLoader<SorterFactory> sorterFactories = ServiceLoader.load(SorterFactory.class); + Iterator<SorterFactory> iterator = sorterFactories.iterator(); + iterator.forEachRemaining( + sorterFactory -> { + String identifier = sorterFactory.getIdentifier(); + sorterFactoryCache.put(identifier, sorterFactory); + LOG.info( + "Loaded scheduling policy {} and its corresponding sorter instance {}", + identifier, + sorterFactory.getClass().getName()); + }); + } + public String name() { return policyName; } @@ -77,12 +99,15 @@ public class SchedulingPolicy { } private Comparator<TableRuntime> createSorterByPolicy() { - if (policyName.equalsIgnoreCase(QUOTA)) { - return new QuotaOccupySorter(); - } else if (policyName.equalsIgnoreCase(BALANCED)) { - return new BalancedSorter(); + if (sorterFactoryCache.get(policyName) != null) { + SorterFactory sorterFactory = sorterFactoryCache.get(policyName); + LOG.info( + "Using sorter instance {} corresponding to the scheduling policy {}", + sorterFactory.getClass().getName(), + policyName); + return sorterFactory.createComparator(); } else { - throw new IllegalArgumentException("Illegal scheduling policy: " + policyName); + throw new IllegalArgumentException("Unsupported scheduling policy: " + policyName); } } @@ -136,30 +161,4 @@ public class SchedulingPolicy { Map<ServerTableIdentifier, TableRuntime> getTableRuntimeMap() { return tableRuntimeMap; } - - private static class QuotaOccupySorter implements Comparator<TableRuntime> { - - private final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap(); - - @Override - public int compare(TableRuntime one, TableRuntime another) { - return Double.compare( - tableWeightMap.computeIfAbsent(one, TableRuntime::calculateQuotaOccupy), - tableWeightMap.computeIfAbsent(another, TableRuntime::calculateQuotaOccupy)); - } - } - - private static class BalancedSorter implements Comparator<TableRuntime> { - @Override - public int compare(TableRuntime one, TableRuntime another) { - return Long.compare( - Math.max( - one.getLastFullOptimizingTime(), - Math.max(one.getLastMinorOptimizingTime(), one.getLastMajorOptimizingTime())), - Math.max( - another.getLastFullOptimizingTime(), - Math.max( - another.getLastMinorOptimizingTime(), another.getLastMajorOptimizingTime()))); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java new file mode 100644 index 000000000..17963fee0 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing.sorter; + +import org.apache.amoro.server.table.TableRuntime; + +import java.util.Comparator; + +public class BalancedSorter implements SorterFactory { + + private static final String IDENTIFIER = "balanced"; + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public Comparator<TableRuntime> createComparator() { + return new Comparator<TableRuntime>() { + @Override + public int compare(TableRuntime one, TableRuntime another) { + return Long.compare( + Math.max( + one.getLastFullOptimizingTime(), + Math.max(one.getLastMinorOptimizingTime(), one.getLastMajorOptimizingTime())), + Math.max( + another.getLastFullOptimizingTime(), + Math.max( + another.getLastMinorOptimizingTime(), another.getLastMajorOptimizingTime()))); + } + }; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java new file mode 100644 index 000000000..27af5104d --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing.sorter; + +import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; + +import java.util.Comparator; +import java.util.Map; + +public class QuotaOccupySorter implements SorterFactory { + + public static final String IDENTIFIER = "quota"; + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public Comparator<TableRuntime> createComparator() { + return new Comparator<TableRuntime>() { + final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap(); + + @Override + public int compare(TableRuntime one, TableRuntime another) { + return Double.compare( + tableWeightMap.computeIfAbsent(one, TableRuntime::calculateQuotaOccupy), + tableWeightMap.computeIfAbsent(another, TableRuntime::calculateQuotaOccupy)); + } + }; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java new file mode 100644 index 000000000..4cc66c506 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing.sorter; + +import org.apache.amoro.server.optimizing.SchedulingPolicy; + +import java.util.Comparator; + +/** + * A factory for sorter. Sorter instantiates a comparator, which is automatically loaded by the + * {@link SchedulingPolicy} as a plugin, as long as the sorter is constructed and the {@link + * /resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory} file + * contains the full qualified class name for the sorter. The comparator sorts the tableRuntimes + * based on the parameters of each tableRuntime in the input tableRuntimeList, and determines the + * scheduling priority for optimization of each tableRuntime. + */ +public interface SorterFactory { + + /** + * Returns a globally unique identifier for the sorter instance, such as Balanced、balanced and + * BALANCED represent the different sorter instance. The {@link SchedulingPolicy} will check the + * values of {@link SchedulingPolicy#policyName} and {@link SorterFactory#getIdentifier} to decide + * which sorter instance to use. If sorter instances sorterA and sorterB have the same identifier + * id1, the {@link SchedulingPolicy} loads sorter instances in the order of their loading, first + * loading sorterA, then sorterB. Ultimately, sorterB will replace sorterA as the final sorter + * instance associated with id1, which is stored in {@link SchedulingPolicy#sorterFactoryCache}. + */ + String getIdentifier(); + + /** Create a comparator for sorter. */ + Comparator createComparator(); +} diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory new file mode 100644 index 000000000..ce802cc88 --- /dev/null +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# * +# http://www.apache.org/licenses/LICENSE-2.0 +# * +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter +org.apache.amoro.server.optimizing.sorter.BalancedSorter \ No newline at end of file