This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new eba34b158 [AMORO-3382]Refactor the policy strategy in Amoro's plan 
module for a plugin-based transformation (#3397)
eba34b158 is described below

commit eba34b158f879b6a05425015451477040a33cf97
Author: zhangwl9 <1298877...@qq.com>
AuthorDate: Mon Jan 20 19:56:08 2025 +0800

    [AMORO-3382]Refactor the policy strategy in Amoro's plan module for a 
plugin-based transformation (#3397)
    
    Optimize the SchedulingPolicy logic, and implement a plugin-based framework 
for the sorter of the policy
    
    Co-authored-by: 张文领 <zhang...@chinatelecom.cn>
---
 .../amoro/server/optimizing/SchedulingPolicy.java  | 67 +++++++++++-----------
 .../server/optimizing/sorter/BalancedSorter.java   | 50 ++++++++++++++++
 .../optimizing/sorter/QuotaOccupySorter.java       | 49 ++++++++++++++++
 .../server/optimizing/sorter/SorterFactory.java    | 48 ++++++++++++++++
 ...he.amoro.server.optimizing.sorter.SorterFactory | 20 +++++++
 5 files changed, 200 insertions(+), 34 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
index c62a188b6..31209062d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
@@ -21,27 +21,35 @@ package org.apache.amoro.server.optimizing;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.api.BlockableOperation;
 import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter;
+import org.apache.amoro.server.optimizing.sorter.SorterFactory;
 import org.apache.amoro.server.table.TableRuntime;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
+import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class SchedulingPolicy {
 
+  public static final Logger LOG = 
LoggerFactory.getLogger(SchedulingPolicy.class);
+
   private static final String SCHEDULING_POLICY_PROPERTY_NAME = 
"scheduling-policy";
-  private static final String QUOTA = "quota";
-  private static final String BALANCED = "balanced";
 
   private final Map<ServerTableIdentifier, TableRuntime> tableRuntimeMap = new 
HashMap<>();
   private volatile String policyName;
   private final Lock tableLock = new ReentrantLock();
+  private static final Map<String, SorterFactory> sorterFactoryCache = new 
ConcurrentHashMap<>();
 
   public SchedulingPolicy(ResourceGroup group) {
     setTableSorterIfNeeded(group);
@@ -53,12 +61,26 @@ public class SchedulingPolicy {
       policyName =
           Optional.ofNullable(optimizerGroup.getProperties())
               .orElseGet(Maps::newHashMap)
-              .getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME, QUOTA);
+              .getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME, 
QuotaOccupySorter.IDENTIFIER);
     } finally {
       tableLock.unlock();
     }
   }
 
+  static {
+    ServiceLoader<SorterFactory> sorterFactories = 
ServiceLoader.load(SorterFactory.class);
+    Iterator<SorterFactory> iterator = sorterFactories.iterator();
+    iterator.forEachRemaining(
+        sorterFactory -> {
+          String identifier = sorterFactory.getIdentifier();
+          sorterFactoryCache.put(identifier, sorterFactory);
+          LOG.info(
+              "Loaded scheduling policy {} and its corresponding sorter 
instance {}",
+              identifier,
+              sorterFactory.getClass().getName());
+        });
+  }
+
   public String name() {
     return policyName;
   }
@@ -77,12 +99,15 @@ public class SchedulingPolicy {
   }
 
   private Comparator<TableRuntime> createSorterByPolicy() {
-    if (policyName.equalsIgnoreCase(QUOTA)) {
-      return new QuotaOccupySorter();
-    } else if (policyName.equalsIgnoreCase(BALANCED)) {
-      return new BalancedSorter();
+    if (sorterFactoryCache.get(policyName) != null) {
+      SorterFactory sorterFactory = sorterFactoryCache.get(policyName);
+      LOG.info(
+          "Using sorter instance {} corresponding to the scheduling policy {}",
+          sorterFactory.getClass().getName(),
+          policyName);
+      return sorterFactory.createComparator();
     } else {
-      throw new IllegalArgumentException("Illegal scheduling policy: " + 
policyName);
+      throw new IllegalArgumentException("Unsupported scheduling policy: " + 
policyName);
     }
   }
 
@@ -136,30 +161,4 @@ public class SchedulingPolicy {
   Map<ServerTableIdentifier, TableRuntime> getTableRuntimeMap() {
     return tableRuntimeMap;
   }
-
-  private static class QuotaOccupySorter implements Comparator<TableRuntime> {
-
-    private final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap();
-
-    @Override
-    public int compare(TableRuntime one, TableRuntime another) {
-      return Double.compare(
-          tableWeightMap.computeIfAbsent(one, 
TableRuntime::calculateQuotaOccupy),
-          tableWeightMap.computeIfAbsent(another, 
TableRuntime::calculateQuotaOccupy));
-    }
-  }
-
-  private static class BalancedSorter implements Comparator<TableRuntime> {
-    @Override
-    public int compare(TableRuntime one, TableRuntime another) {
-      return Long.compare(
-          Math.max(
-              one.getLastFullOptimizingTime(),
-              Math.max(one.getLastMinorOptimizingTime(), 
one.getLastMajorOptimizingTime())),
-          Math.max(
-              another.getLastFullOptimizingTime(),
-              Math.max(
-                  another.getLastMinorOptimizingTime(), 
another.getLastMajorOptimizingTime())));
-    }
-  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java
new file mode 100644
index 000000000..17963fee0
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/BalancedSorter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.sorter;
+
+import org.apache.amoro.server.table.TableRuntime;
+
+import java.util.Comparator;
+
+public class BalancedSorter implements SorterFactory {
+
+  private static final String IDENTIFIER = "balanced";
+
+  @Override
+  public String getIdentifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public Comparator<TableRuntime> createComparator() {
+    return new Comparator<TableRuntime>() {
+      @Override
+      public int compare(TableRuntime one, TableRuntime another) {
+        return Long.compare(
+            Math.max(
+                one.getLastFullOptimizingTime(),
+                Math.max(one.getLastMinorOptimizingTime(), 
one.getLastMajorOptimizingTime())),
+            Math.max(
+                another.getLastFullOptimizingTime(),
+                Math.max(
+                    another.getLastMinorOptimizingTime(), 
another.getLastMajorOptimizingTime())));
+      }
+    };
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java
new file mode 100644
index 000000000..27af5104d
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/QuotaOccupySorter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.sorter;
+
+import org.apache.amoro.server.table.TableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+
+import java.util.Comparator;
+import java.util.Map;
+
+public class QuotaOccupySorter implements SorterFactory {
+
+  public static final String IDENTIFIER = "quota";
+
+  @Override
+  public String getIdentifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public Comparator<TableRuntime> createComparator() {
+    return new Comparator<TableRuntime>() {
+      final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap();
+
+      @Override
+      public int compare(TableRuntime one, TableRuntime another) {
+        return Double.compare(
+            tableWeightMap.computeIfAbsent(one, 
TableRuntime::calculateQuotaOccupy),
+            tableWeightMap.computeIfAbsent(another, 
TableRuntime::calculateQuotaOccupy));
+      }
+    };
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java
new file mode 100644
index 000000000..4cc66c506
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/sorter/SorterFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing.sorter;
+
+import org.apache.amoro.server.optimizing.SchedulingPolicy;
+
+import java.util.Comparator;
+
+/**
+ * A factory for sorter. Sorter instantiates a comparator, which is 
automatically loaded by the
+ * {@link SchedulingPolicy} as a plugin, as long as the sorter is constructed 
and the {@link
+ * 
/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory}
 file
+ * contains the full qualified class name for the sorter. The comparator sorts 
the tableRuntimes
+ * based on the parameters of each tableRuntime in the input tableRuntimeList, 
and determines the
+ * scheduling priority for optimization of each tableRuntime.
+ */
+public interface SorterFactory {
+
+  /**
+   * Returns a globally unique identifier for the sorter instance, such as 
Balanced、balanced and
+   * BALANCED represent the different sorter instance. The {@link 
SchedulingPolicy} will check the
+   * values of {@link SchedulingPolicy#policyName} and {@link 
SorterFactory#getIdentifier} to decide
+   * which sorter instance to use. If sorter instances sorterA and sorterB 
have the same identifier
+   * id1, the {@link SchedulingPolicy} loads sorter instances in the order of 
their loading, first
+   * loading sorterA, then sorterB. Ultimately, sorterB will replace sorterA 
as the final sorter
+   * instance associated with id1, which is stored in {@link 
SchedulingPolicy#sorterFactoryCache}.
+   */
+  String getIdentifier();
+
+  /** Create a comparator for sorter. */
+  Comparator createComparator();
+}
diff --git 
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory
 
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory
new file mode 100644
index 000000000..ce802cc88
--- /dev/null
+++ 
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#  *
+#     http://www.apache.org/licenses/LICENSE-2.0
+#  *
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter
+org.apache.amoro.server.optimizing.sorter.BalancedSorter
\ No newline at end of file

Reply via email to