This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 2402cd3db [AMORO-3012] Exclude unnecessary artifacts for spark 
optimizer (#3013)
2402cd3db is described below

commit 2402cd3db22b554a2e35aa48110cb1b12b8516a5
Author: ZhouJinsong <[email protected]>
AuthorDate: Wed Jul 10 15:31:58 2024 +0800

    [AMORO-3012] Exclude unnecessary artifacts for spark optimizer (#3013)
    
    * Exclude not needed classes from spark optimizer
    
    * Rollback unnecessary changes
    
    * Add some guide for the class path error
---
 .../amoro/optimizer/common/OptimizerExecutor.java  |  3 +--
 .../amoro-optimizer-spark/pom.xml                  | 11 +++++++++++
 .../optimizer/spark/SparkOptimizerExecutor.java    |  8 ++++++--
 .../apache/amoro/io/AuthenticatedHadoopFileIO.java |  1 -
 .../apache/amoro/io/reader/StructLikeFunnel.java   |  8 +-------
 .../org/apache/amoro/table/TableMetaStore.java     |  2 ++
 .../org/apache/amoro/utils/SerializationUtil.java  | 22 ++++------------------
 docs/admin-guides/managing-optimizers.md           |  8 +++++++-
 8 files changed, 32 insertions(+), 31 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
 
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index 60fef2481..3e797f800 100644
--- 
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++ 
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -160,10 +160,9 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
       return result;
     } catch (Throwable t) {
       logger.error(
-          "Optimizer executor[{}] executed task[{}]({}) failed and cost {}",
+          "Optimizer executor[{}] executed task[{}] failed and cost {}",
           threadId,
           task.getTaskId(),
-          input,
           System.currentTimeMillis() - startTime,
           t);
       OptimizingTaskResult errorResult = new 
OptimizingTaskResult(task.getTaskId(), threadId);
diff --git a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml 
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml
index 1174214a4..5116acf84 100644
--- a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml
+++ b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml
@@ -122,11 +122,22 @@
                         </goals>
                         <configuration>
                             
<createDependencyReducedPom>false</createDependencyReducedPom>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.slf4j:slf4j-api</exclude>
+                                    <exclude>org.apache.hadoop:*</exclude>
+                                    <exclude>org.apache.hive:*</exclude>
+                                </excludes>
+                            </artifactSet>
                             <relocations>
                                 <relocation>
                                     <pattern>org.apache.parquet</pattern>
                                     
<shadedPattern>org.apache.amoro.shade.org.apache.parquet</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.apache.orc</pattern>
+                                    
<shadedPattern>org.apache.amoro.shade.org.apache.orc</shadedPattern>
+                                </relocation>
                             </relocations>
                             
<finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
                         </configuration>
diff --git 
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
 
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
index ce4499df1..b1cae14ad 100644
--- 
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
+++ 
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
@@ -52,8 +52,8 @@ public class SparkOptimizerExecutor extends OptimizerExecutor 
{
   protected OptimizingTaskResult executeTask(OptimizingTask task) {
     OptimizingTaskResult result;
     String threadName = Thread.currentThread().getName();
+    long startTime = System.currentTimeMillis();
     try {
-      long startTime = System.currentTimeMillis();
       ImmutableList<OptimizingTask> of = ImmutableList.of(task);
       jsc.setJobDescription(jobDescription(task));
       SparkOptimizingTaskFunction taskFunction =
@@ -68,7 +68,11 @@ public class SparkOptimizerExecutor extends 
OptimizerExecutor {
       return result;
     } catch (Throwable r) {
       LOG.error(
-          "Optimizer executor[{}] executed task[{}] failed, and cost {}", 
threadName, task, r);
+          "Optimizer executor[{}] executed task[{}] failed, and cost {}",
+          threadName,
+          task.getTaskId(),
+          (System.currentTimeMillis() - startTime),
+          r);
       result = new OptimizingTaskResult(task.getTaskId(), threadId);
       result.setErrorMessage(ExceptionUtil.getErrorMessage(r, 4000));
       return result;
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java 
b/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java
index d20599d27..face2daaf 100644
--- 
a/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java
+++ 
b/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java
@@ -47,7 +47,6 @@ public class AuthenticatedHadoopFileIO extends HadoopFileIO
     implements AuthenticatedFileIO, SupportsPrefixOperations, 
SupportsFileSystemOperations {
 
   private final TableMetaStore tableMetaStore;
-  private boolean fileRecycleEnabled;
 
   AuthenticatedHadoopFileIO(TableMetaStore tableMetaStore) {
     super(tableMetaStore.getConfiguration());
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java 
b/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java
index cdc51910d..e83106b28 100644
--- a/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java
+++ b/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java
@@ -24,8 +24,6 @@ import org.apache.amoro.utils.SerializationUtil;
 import org.apache.iceberg.StructLike;
 import org.jetbrains.annotations.NotNull;
 
-import java.io.IOException;
-
 public enum StructLikeFunnel implements Funnel<StructLike> {
   INSTANCE;
 
@@ -34,10 +32,6 @@ public enum StructLikeFunnel implements Funnel<StructLike> {
   @Override
   public void funnel(@NotNull StructLike structLike, PrimitiveSink 
primitiveSink) {
     StructLike copy = SerializationUtil.StructLikeCopy.copy(structLike);
-    try {
-      primitiveSink.putBytes(SerializationUtil.kryoSerialize(copy));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    primitiveSink.putBytes(SerializationUtil.kryoSerialize(copy));
   }
 }
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java 
b/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java
index b3ab1793e..ef022d328 100644
--- a/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java
+++ b/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java
@@ -61,6 +61,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /** Stores hadoop config files for {@link MixedTable} */
 public class TableMetaStore implements Serializable {
+  private static final long serialVersionUID = 1L;
+
   private static final Logger LOG = 
LoggerFactory.getLogger(TableMetaStore.class);
 
   // Share runtime context with same configuration as context is expensive to 
construct
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java 
b/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java
index 67752f19a..a1a662dc2 100644
--- a/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java
+++ b/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java
@@ -27,7 +27,6 @@ import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.StructLikeWrapper;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
@@ -57,11 +56,6 @@ public class SerializationUtil {
     }
   }
 
-  public static <T> T simpleDeserialize(ByteBuffer buffer) {
-    byte[] bytes = ByteBuffers.toByteArray(buffer);
-    return simpleDeserialize(bytes);
-  }
-
   public static <T> T simpleDeserialize(byte[] bytes) {
     if (bytes == null) {
       return null;
@@ -75,7 +69,7 @@ public class SerializationUtil {
     }
   }
 
-  public static byte[] kryoSerialize(final Object obj) throws IOException {
+  public static byte[] kryoSerialize(final Object obj) {
     return KRYO_SERIALIZER.get().serialize(obj);
   }
 
@@ -185,11 +179,7 @@ public class SerializationUtil {
     public byte[] serialize(StructLikeWrapper structLikeWrapper) {
       checkNotNull(structLikeWrapper);
       StructLike copy = 
SerializationUtil.StructLikeCopy.copy(structLikeWrapper.get());
-      try {
-        return SerializationUtil.kryoSerialize(copy);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      return SerializationUtil.kryoSerialize(copy);
     }
 
     @Override
@@ -208,12 +198,8 @@ public class SerializationUtil {
 
     @Override
     public byte[] serialize(T t) {
-      try {
-        checkNotNull(t);
-        return SerializationUtil.kryoSerialize(t);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      checkNotNull(t);
+      return SerializationUtil.kryoSerialize(t);
     }
 
     @Override
diff --git a/docs/admin-guides/managing-optimizers.md 
b/docs/admin-guides/managing-optimizers.md
index 94d08ebbf..2da26a903 100644
--- a/docs/admin-guides/managing-optimizers.md
+++ b/docs/admin-guides/managing-optimizers.md
@@ -183,7 +183,13 @@ To better utilize the resources of Spark Optimizer, the 
DRA(Dynamic Resource All
 If you don't want this feature, you can use these settings:
 * Set `spark-conf.spark.dynamicAllocation.enabled` to `false` as you need 
allocate proper driver/executor resources Using [Spark Configuration 
Options](https://spark.apache.org/docs/latest/configuration.html).
 * Set `spark-conf.spark.dynamicAllocation.maxExecutors` to `10` as optimizer 
parallelism can only affect parallelism polling optimizing tasks from AMS.
-  {{< /hint >}}
+{{< /hint >}}
+
+{{< hint info >}}
+The spark optimizer may fail due to class conflicts sometimes, you can try to 
fix by following the steps below:
+* Set `spark-conf.spark.driver.userClassPathFirst` to `true`.
+* Set `spark-conf.spark.executor.userClassPathFirst` to `true`.
+{{< /hint >}}
 
 An example for yarn client mode:
 

Reply via email to