This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 2402cd3db [AMORO-3012] Exclude unnecessary artifacts for spark
optimizer (#3013)
2402cd3db is described below
commit 2402cd3db22b554a2e35aa48110cb1b12b8516a5
Author: ZhouJinsong <[email protected]>
AuthorDate: Wed Jul 10 15:31:58 2024 +0800
[AMORO-3012] Exclude unnecessary artifacts for spark optimizer (#3013)
* Exclude not needed classes from spark optimizer
* Rollback unnecessary changes
* Add some guide for the class path error
---
.../amoro/optimizer/common/OptimizerExecutor.java | 3 +--
.../amoro-optimizer-spark/pom.xml | 11 +++++++++++
.../optimizer/spark/SparkOptimizerExecutor.java | 8 ++++++--
.../apache/amoro/io/AuthenticatedHadoopFileIO.java | 1 -
.../apache/amoro/io/reader/StructLikeFunnel.java | 8 +-------
.../org/apache/amoro/table/TableMetaStore.java | 2 ++
.../org/apache/amoro/utils/SerializationUtil.java | 22 ++++------------------
docs/admin-guides/managing-optimizers.md | 8 +++++++-
8 files changed, 32 insertions(+), 31 deletions(-)
diff --git
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index 60fef2481..3e797f800 100644
---
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -160,10 +160,9 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
return result;
} catch (Throwable t) {
logger.error(
- "Optimizer executor[{}] executed task[{}]({}) failed and cost {}",
+ "Optimizer executor[{}] executed task[{}] failed and cost {}",
threadId,
task.getTaskId(),
- input,
System.currentTimeMillis() - startTime,
t);
OptimizingTaskResult errorResult = new
OptimizingTaskResult(task.getTaskId(), threadId);
diff --git a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml
index 1174214a4..5116acf84 100644
--- a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml
+++ b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/pom.xml
@@ -122,11 +122,22 @@
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
+ <artifactSet>
+ <excludes>
+ <exclude>org.slf4j:slf4j-api</exclude>
+ <exclude>org.apache.hadoop:*</exclude>
+ <exclude>org.apache.hive:*</exclude>
+ </excludes>
+ </artifactSet>
<relocations>
<relocation>
<pattern>org.apache.parquet</pattern>
<shadedPattern>org.apache.amoro.shade.org.apache.parquet</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.apache.orc</pattern>
+
<shadedPattern>org.apache.amoro.shade.org.apache.orc</shadedPattern>
+ </relocation>
</relocations>
<finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
</configuration>
diff --git
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
index ce4499df1..b1cae14ad 100644
---
a/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
+++
b/amoro-ams/amoro-ams-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java
@@ -52,8 +52,8 @@ public class SparkOptimizerExecutor extends OptimizerExecutor
{
protected OptimizingTaskResult executeTask(OptimizingTask task) {
OptimizingTaskResult result;
String threadName = Thread.currentThread().getName();
+ long startTime = System.currentTimeMillis();
try {
- long startTime = System.currentTimeMillis();
ImmutableList<OptimizingTask> of = ImmutableList.of(task);
jsc.setJobDescription(jobDescription(task));
SparkOptimizingTaskFunction taskFunction =
@@ -68,7 +68,11 @@ public class SparkOptimizerExecutor extends
OptimizerExecutor {
return result;
} catch (Throwable r) {
LOG.error(
- "Optimizer executor[{}] executed task[{}] failed, and cost {}",
threadName, task, r);
+ "Optimizer executor[{}] executed task[{}] failed, and cost {}",
+ threadName,
+ task.getTaskId(),
+ (System.currentTimeMillis() - startTime),
+ r);
result = new OptimizingTaskResult(task.getTaskId(), threadId);
result.setErrorMessage(ExceptionUtil.getErrorMessage(r, 4000));
return result;
diff --git
a/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java
b/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java
index d20599d27..face2daaf 100644
---
a/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java
+++
b/amoro-core/src/main/java/org/apache/amoro/io/AuthenticatedHadoopFileIO.java
@@ -47,7 +47,6 @@ public class AuthenticatedHadoopFileIO extends HadoopFileIO
implements AuthenticatedFileIO, SupportsPrefixOperations,
SupportsFileSystemOperations {
private final TableMetaStore tableMetaStore;
- private boolean fileRecycleEnabled;
AuthenticatedHadoopFileIO(TableMetaStore tableMetaStore) {
super(tableMetaStore.getConfiguration());
diff --git
a/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java
b/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java
index cdc51910d..e83106b28 100644
--- a/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java
+++ b/amoro-core/src/main/java/org/apache/amoro/io/reader/StructLikeFunnel.java
@@ -24,8 +24,6 @@ import org.apache.amoro.utils.SerializationUtil;
import org.apache.iceberg.StructLike;
import org.jetbrains.annotations.NotNull;
-import java.io.IOException;
-
public enum StructLikeFunnel implements Funnel<StructLike> {
INSTANCE;
@@ -34,10 +32,6 @@ public enum StructLikeFunnel implements Funnel<StructLike> {
@Override
public void funnel(@NotNull StructLike structLike, PrimitiveSink
primitiveSink) {
StructLike copy = SerializationUtil.StructLikeCopy.copy(structLike);
- try {
- primitiveSink.putBytes(SerializationUtil.kryoSerialize(copy));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ primitiveSink.putBytes(SerializationUtil.kryoSerialize(copy));
}
}
diff --git
a/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java
b/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java
index b3ab1793e..ef022d328 100644
--- a/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java
+++ b/amoro-core/src/main/java/org/apache/amoro/table/TableMetaStore.java
@@ -61,6 +61,8 @@ import java.util.concurrent.ConcurrentHashMap;
/** Stores hadoop config files for {@link MixedTable} */
public class TableMetaStore implements Serializable {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG =
LoggerFactory.getLogger(TableMetaStore.class);
// Share runtime context with same configuration as context is expensive to
construct
diff --git
a/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java
b/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java
index 67752f19a..a1a662dc2 100644
--- a/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java
+++ b/amoro-core/src/main/java/org/apache/amoro/utils/SerializationUtil.java
@@ -27,7 +27,6 @@ import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.StructLikeWrapper;
import org.objenesis.strategy.StdInstantiatorStrategy;
@@ -57,11 +56,6 @@ public class SerializationUtil {
}
}
- public static <T> T simpleDeserialize(ByteBuffer buffer) {
- byte[] bytes = ByteBuffers.toByteArray(buffer);
- return simpleDeserialize(bytes);
- }
-
public static <T> T simpleDeserialize(byte[] bytes) {
if (bytes == null) {
return null;
@@ -75,7 +69,7 @@ public class SerializationUtil {
}
}
- public static byte[] kryoSerialize(final Object obj) throws IOException {
+ public static byte[] kryoSerialize(final Object obj) {
return KRYO_SERIALIZER.get().serialize(obj);
}
@@ -185,11 +179,7 @@ public class SerializationUtil {
public byte[] serialize(StructLikeWrapper structLikeWrapper) {
checkNotNull(structLikeWrapper);
StructLike copy =
SerializationUtil.StructLikeCopy.copy(structLikeWrapper.get());
- try {
- return SerializationUtil.kryoSerialize(copy);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return SerializationUtil.kryoSerialize(copy);
}
@Override
@@ -208,12 +198,8 @@ public class SerializationUtil {
@Override
public byte[] serialize(T t) {
- try {
- checkNotNull(t);
- return SerializationUtil.kryoSerialize(t);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ checkNotNull(t);
+ return SerializationUtil.kryoSerialize(t);
}
@Override
diff --git a/docs/admin-guides/managing-optimizers.md
b/docs/admin-guides/managing-optimizers.md
index 94d08ebbf..2da26a903 100644
--- a/docs/admin-guides/managing-optimizers.md
+++ b/docs/admin-guides/managing-optimizers.md
@@ -183,7 +183,13 @@ To better utilize the resources of Spark Optimizer, the
DRA(Dynamic Resource All
If you don't want this feature, you can use these settings:
* Set `spark-conf.spark.dynamicAllocation.enabled` to `false` as you need
allocate proper driver/executor resources Using [Spark Configuration
Options](https://spark.apache.org/docs/latest/configuration.html).
* Set `spark-conf.spark.dynamicAllocation.maxExecutors` to `10` as optimizer
parallelism can only affect parallelism polling optimizing tasks from AMS.
- {{< /hint >}}
+{{< /hint >}}
+
+{{< hint info >}}
+The spark optimizer may fail due to class conflicts sometimes, you can try to
fix by following the steps below:
+* Set `spark-conf.spark.driver.userClassPathFirst` to `true`.
+* Set `spark-conf.spark.executor.userClassPathFirst` to `true`.
+{{< /hint >}}
An example for yarn client mode: