This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch czy006/paimon-maintainer-model
in repository https://gitbox.apache.org/repos/asf/amoro.git

commit 1b7141a7ec4f250f0047772ccb87c0e9c5e5c190
Author: ConradJam <[email protected]>
AuthorDate: Thu Jan 29 14:11:20 2026 +0800

    [Feature] Add remote maintainer framework for Paimon tables
    
    This commit introduces a framework for executing Paimon table maintenance
    operations (snapshot expiration, orphan file cleanup) remotely on Spark
    optimizers, following the existing Optimizer pattern.
    
    Changes:
    - Add MaintainerInput/Output interfaces and base implementations
    - Add MaintainerExecutor/Factory interfaces for remote execution
    - Create amoro-optimizer-paimon-spark module with SparkMaintainerExecutor
    - Implement PaimonSnapshotExpire* components for snapshot expiration
    - Add placeholder SparkOptimizer for future Paimon optimizing support
    
    Co-Authored-By: Claude (glm-4.7) <[email protected]>
---
 .../amoro/maintainer/BaseMaintainerInput.java      |  57 ++++++++
 .../amoro/maintainer/BaseMaintainerOutput.java     |  75 +++++++++++
 .../amoro/maintainer/MaintainerExecutor.java       |  38 ++++++
 .../maintainer/MaintainerExecutorFactory.java      |  47 +++++++
 .../apache/amoro/maintainer/MaintainerInput.java   |  81 +++++++++++
 .../apache/amoro/maintainer/MaintainerOutput.java  |  50 +++++++
 .../amoro-optimizer-paimon-spark/pom.xml           | 148 +++++++++++++++++++++
 .../paimon/spark/SparkMaintainerExecutor.java      |  88 ++++++++++++
 .../optimizer/paimon/spark/SparkOptimizer.java     |  75 +++++++++++
 .../paimon/spark/SparkOptimizerExecutor.java       |  56 ++++++++
 .../spark/maintainer/PaimonMaintainerOutput.java   | 108 +++++++++++++++
 .../maintainer/PaimonSnapshotExpireExecutor.java   |  74 +++++++++++
 .../maintainer/PaimonSnapshotExpireFactory.java    |  48 +++++++
 .../maintainer/PaimonSnapshotExpireInput.java      |  86 ++++++++++++
 .../maintainer/SparkMaintainerTaskFunction.java    | 104 +++++++++++++++
 amoro-optimizer/pom.xml                            |   1 +
 16 files changed, 1136 insertions(+)

diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java
 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java
new file mode 100644
index 000000000..c642c8901
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Base implementation of MaintainerInput following BaseOptimizingInput 
pattern. */
+public abstract class BaseMaintainerInput implements MaintainerInput {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Map<String, String> options = new HashMap<>();
+
+  @Override
+  public void option(String name, String value) {
+    options.put(name, value);
+  }
+
+  @Override
+  public void options(Map<String, String> options) {
+    this.options.putAll(options);
+  }
+
+  @Override
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("operationType", getOperationType())
+        .add("tableIdentifier", getTableIdentifier())
+        .add("tableFormat", getTableFormat())
+        .add("options", options)
+        .toString();
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java
 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java
new file mode 100644
index 000000000..3d5658bf7
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Base implementation of MaintainerOutput. */
+public class BaseMaintainerOutput implements MaintainerOutput {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Map<String, String> summary;
+  private final boolean success;
+  private final String errorMessage;
+
+  /** Create a successful maintainer output. */
+  public BaseMaintainerOutput() {
+    this(true, null);
+  }
+
+  /**
+   * Create a maintainer output with specified status.
+   *
+   * @param success whether the operation succeeded
+   * @param errorMessage error message if failed, null otherwise
+   */
+  public BaseMaintainerOutput(boolean success, String errorMessage) {
+    this.summary = new HashMap<>();
+    this.success = success;
+    this.errorMessage = errorMessage;
+  }
+
+  @Override
+  public Map<String, String> summary() {
+    return Collections.unmodifiableMap(summary);
+  }
+
+  /**
+   * Add a summary entry.
+   *
+   * @param key summary key
+   * @param value summary value
+   */
+  public void putSummary(String key, String value) {
+    summary.put(key, value);
+  }
+
+  @Override
+  public boolean isSuccess() {
+    return success;
+  }
+
+  @Override
+  public String getErrorMessage() {
+    return errorMessage;
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java
 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java
new file mode 100644
index 000000000..02076b000
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import java.io.Serializable;
+
+/**
+ * Executor interface for maintainer operations. Follows the same pattern as 
OptimizingExecutor.
+ *
+ * @param <I> the maintainer input type
+ * @param <O> the maintainer output type
+ */
+public interface MaintainerExecutor<I extends MaintainerInput, O extends 
MaintainerOutput>
+    extends Serializable {
+
+  /**
+   * Execute the maintainer operation.
+   *
+   * @return the maintainer output with execution results
+   */
+  O execute();
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java
 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java
new file mode 100644
index 000000000..1caf782f1
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Factory interface for creating MaintainerExecutor instances. Follows the 
same pattern as
+ * OptimizingExecutorFactory.
+ *
+ * @param <I> the maintainer input type
+ */
+public interface MaintainerExecutorFactory<I extends MaintainerInput> extends 
Serializable {
+
+  /**
+   * Initialize the factory with task properties. Called after constructing 
the factory through a
+   * parameterless constructor.
+   *
+   * @param properties the task properties
+   */
+  void initialize(Map<String, String> properties);
+
+  /**
+   * Create an executor from the given input.
+   *
+   * @param input the maintainer input
+   * @return the maintainer executor
+   */
+  MaintainerExecutor<?, ?> createExecutor(I input);
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java
new file mode 100644
index 000000000..c2c735db5
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Input interface for maintainer operations executed remotely. Follows the 
same pattern as
+ * TableOptimizing.OptimizingInput.
+ */
+public interface MaintainerInput extends Serializable {
+
+  /** Maintainer operation type */
+  enum OperationType {
+    SNAPSHOT_EXPIRATION,
+    ORPHAN_FILE_CLEANING,
+    DANGLING_DELETE_CLEANING,
+    DATA_EXPIRATION,
+    TAG_CREATION
+  }
+
+  /**
+   * Get the operation type for this maintainer task.
+   *
+   * @return the operation type
+   */
+  OperationType getOperationType();
+
+  /**
+   * Get the table identifier.
+   *
+   * @return table identifier string
+   */
+  String getTableIdentifier();
+
+  /**
+   * Get the table format (ICEBERG, PAIMON, etc.).
+   *
+   * @return table format string
+   */
+  String getTableFormat();
+
+  /**
+   * Set an option for this maintainer operation.
+   *
+   * @param name option name
+   * @param value option value
+   */
+  void option(String name, String value);
+
+  /**
+   * Set multiple options for this maintainer operation.
+   *
+   * @param options map of option names to values
+   */
+  void options(Map<String, String> options);
+
+  /**
+   * Get all options for this maintainer operation.
+   *
+   * @return map of option names to values
+   */
+  Map<String, String> getOptions();
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java
new file mode 100644
index 000000000..322aa44b9
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.maintainer;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Output interface for maintainer operations executed remotely. Follows the 
same pattern as
+ * TableOptimizing.OptimizingOutput.
+ */
+public interface MaintainerOutput extends Serializable {
+
+  /**
+   * Get a summary of the maintainer operation execution.
+   *
+   * @return map containing summary information about the operation
+   */
+  Map<String, String> summary();
+
+  /**
+   * Check if the maintainer operation completed successfully.
+   *
+   * @return true if operation succeeded, false otherwise
+   */
+  boolean isSuccess();
+
+  /**
+   * Get the error message if the operation failed.
+   *
+   * @return error message, or null if operation succeeded
+   */
+  String getErrorMessage();
+}
diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml 
b/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml
new file mode 100644
index 000000000..66ba56606
--- /dev/null
+++ b/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.amoro</groupId>
+        <artifactId>amoro-optimizer</artifactId>
+        <version>0.9-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    
<artifactId>amoro-optimizer-paimon-spark-${spark.major.version}_${scala.binary.version}</artifactId>
+    <name>Amoro Project Paimon Spark Optimizer</name>
+    <url>https://amoro.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.amoro</groupId>
+            <artifactId>amoro-optimizer-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Paimon format support -->
+        <dependency>
+            <groupId>org.apache.amoro</groupId>
+            <artifactId>amoro-format-paimon</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- spark dependencies begin -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-hadoop</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>arrow-memory-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>arrow-memory-netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>arrow-vector</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Paimon dependencies -->
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-spark-${spark.major.version}</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-amoro</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.slf4j:slf4j-api</exclude>
+                                    <exclude>org.apache.hadoop:*</exclude>
+                                    <exclude>org.apache.hive:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.paimon</pattern>
+                                    
<shadedPattern>org.apache.amoro.shade.org.apache.paimon</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            
<finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java
new file mode 100644
index 000000000..eca36c97f
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark;
+
+import org.apache.amoro.api.OptimizingTask;
+import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.maintainer.MaintainerInput;
+import org.apache.amoro.optimizer.common.OptimizerConfig;
+import org.apache.amoro.optimizer.common.OptimizerExecutor;
+import 
org.apache.amoro.optimizer.paimon.spark.maintainer.SparkMaintainerTaskFunction;
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
+import org.apache.amoro.utils.ExceptionUtil;
+import org.apache.amoro.utils.SerializationUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Executor for maintainer tasks in Spark environment. Similar to 
SparkOptimizerExecutor but handles
+ * maintainer operations.
+ */
+public class SparkMaintainerExecutor extends OptimizerExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMaintainerExecutor.class);
+  private final JavaSparkContext jsc;
+  private final int threadId;
+
+  public SparkMaintainerExecutor(JavaSparkContext jsc, OptimizerConfig config, 
int threadId) {
+    super(config, threadId);
+    this.jsc = jsc;
+    this.threadId = threadId;
+  }
+
+  @Override
+  protected OptimizingTaskResult executeTask(OptimizingTask task) {
+    OptimizingTaskResult result;
+    String threadName = Thread.currentThread().getName();
+    long startTime = System.currentTimeMillis();
+    try {
+      ImmutableList<OptimizingTask> of = ImmutableList.of(task);
+      jsc.setJobDescription(jobDescription(task));
+      SparkMaintainerTaskFunction taskFunction =
+          new SparkMaintainerTaskFunction(getConfig(), threadId);
+      List<OptimizingTaskResult> results = jsc.parallelize(of, 
1).map(taskFunction).collect();
+      result = results.get(0);
+      LOG.info(
+          "Maintainer executor[{}] executed task[{}] and cost {} ms",
+          threadName,
+          task.getTaskId(),
+          System.currentTimeMillis() - startTime);
+      return result;
+    } catch (Throwable r) {
+      LOG.error(
+          "Maintainer executor[{}] executed task[{}] failed, and cost {} ms",
+          threadName,
+          task.getTaskId(),
+          (System.currentTimeMillis() - startTime),
+          r);
+      result = new OptimizingTaskResult(task.getTaskId(), threadId);
+      result.setErrorMessage(ExceptionUtil.getErrorMessage(r, 
ERROR_MESSAGE_MAX_LENGTH));
+      return result;
+    }
+  }
+
+  private String jobDescription(OptimizingTask task) {
+    MaintainerInput input = 
SerializationUtil.simpleDeserialize(task.getTaskInput());
+    return String.format(
+        "Amoro Paimon maintainer task, operation: %s, table: %s, task id: %s",
+        input.getOperationType(), input.getTableIdentifier(), 
task.getTaskId());
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java
new file mode 100644
index 000000000..5528fb641
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark;
+
+import org.apache.amoro.optimizer.common.Optimizer;
+import org.apache.amoro.optimizer.common.OptimizerConfig;
+import org.apache.amoro.optimizer.common.OptimizerToucher;
+import org.apache.amoro.resource.Resource;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point for Paimon Spark optimizer. Supports both optimizing and 
maintainer tasks.
+ *
+ * <p>Note: Optimizer functionality is currently not implemented. This class 
serves as a placeholder
+ * for future Paimon optimizing implementation.
+ */
+public class SparkOptimizer extends Optimizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkOptimizer.class);
+  private static final String APP_NAME_FORMAT = 
"amoro-paimon-spark-optimizer-%s";
+
+  public SparkOptimizer(OptimizerConfig config, JavaSparkContext jsc) {
+    super(config, () -> new OptimizerToucher(config), (i) -> new 
SparkOptimizerExecutor(config, i));
+  }
+
+  public static void main(String[] args) throws Exception {
+    OptimizerConfig config = new OptimizerConfig(args);
+    SparkSession spark =
+        SparkSession.builder()
+            .appName(String.format(APP_NAME_FORMAT, config.getResourceId()))
+            .getOrCreate();
+    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+
+    if (!jsc.getConf().getBoolean("spark.dynamicAllocation.enabled", false)) {
+      LOG.warn(
+          "To better utilize computing resources, it is recommended to enable "
+              + "'spark.dynamicAllocation.enabled' "
+              + "and set 'spark.dynamicAllocation.maxExecutors' equal to 
'OPTIMIZER_EXECUTION_PARALLEL'");
+    }
+
+    // Calculate optimizer memory allocation
+    int driverMemory = 
Utils.memoryStringToMb(jsc.getConf().get("spark.driver.memory", "1g"));
+    int executorMemory = 
Utils.memoryStringToMb(jsc.getConf().get("spark.executor.memory", "1g"));
+    int executorCores = jsc.getConf().getInt("spark.executor.cores", 1);
+    int executionParallel = config.getExecutionParallel();
+    int executorNum = (int) Math.ceil((double) executionParallel / 
executorCores);
+    config.setMemorySize(driverMemory + executorNum * executorMemory);
+
+    SparkOptimizer optimizer = new SparkOptimizer(config, jsc);
+    OptimizerToucher toucher = optimizer.getToucher();
+    toucher.withRegisterProperty(Resource.PROPERTY_JOB_ID, 
spark.sparkContext().applicationId());
+
+    LOG.info("Starting the Paimon Spark optimizer with configuration:{}", 
config);
+    optimizer.startOptimizing();
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java
new file mode 100644
index 000000000..2d7f6a183
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark;
+
+import org.apache.amoro.api.OptimizingTask;
+import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.optimizer.common.OptimizerConfig;
+import org.apache.amoro.optimizer.common.OptimizerExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executor for Paimon optimizing tasks in Spark environment.
+ *
+ * <p>This is a placeholder implementation for future Paimon optimizing 
functionality. Currently,
+ * all optimizing operations will throw an UnsupportedOperationException.
+ *
+ * <p>For maintainer operations, use {@link SparkMaintainerExecutor} instead.
+ */
+public class SparkOptimizerExecutor extends OptimizerExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkOptimizerExecutor.class);
+
+  public SparkOptimizerExecutor(OptimizerConfig config, int threadId) {
+    super(config, threadId);
+  }
+
+  @Override
+  protected OptimizingTaskResult executeTask(OptimizingTask task) {
+    String errorMessage =
+        "Paimon optimizing is not yet implemented. "
+            + "For maintainer operations, please use SparkMaintainerExecutor 
instead.";
+    LOG.error(
+        "Optimizer executor[{}] encountered unsupported operation: {}",
+        getThreadId(),
+        errorMessage);
+    OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), 
getThreadId());
+    result.setErrorMessage(errorMessage);
+    return result;
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java
new file mode 100644
index 000000000..b7cae6c94
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark.maintainer;
+
+import org.apache.amoro.maintainer.BaseMaintainerOutput;
+
+/** Output for Paimon snapshot expiration operation. */
+public class PaimonMaintainerOutput extends BaseMaintainerOutput {
+
+  private static final long serialVersionUID = 1L;
+
+  /** Summary key for expired snapshot count */
+  public static final String EXPIRED_SNAPSHOT_COUNT = "expired_snapshot_count";
+
+  /** Summary key for expired data file count */
+  public static final String EXPIRED_DATA_FILE_COUNT = 
"expired_data_file_count";
+
+  /** Summary key for expired data file size */
+  public static final String EXPIRED_DATA_FILE_SIZE = "expired_data_file_size";
+
+  /** Create a successful maintainer output. */
+  public PaimonMaintainerOutput() {
+    super(true, null);
+  }
+
+  /**
+   * Create a maintainer output with specified status.
+   *
+   * @param success whether the operation succeeded
+   * @param errorMessage error message if failed, null otherwise
+   */
+  public PaimonMaintainerOutput(boolean success, String errorMessage) {
+    super(success, errorMessage);
+  }
+
+  /**
+   * Set the number of expired snapshots.
+   *
+   * @param count number of snapshots expired
+   */
+  public void setExpiredSnapshotCount(int count) {
+    putSummary(EXPIRED_SNAPSHOT_COUNT, String.valueOf(count));
+  }
+
+  /**
+   * Set the number of expired data files.
+   *
+   * @param count number of data files expired
+   */
+  public void setExpiredDataFileCount(int count) {
+    putSummary(EXPIRED_DATA_FILE_COUNT, String.valueOf(count));
+  }
+
+  /**
+   * Set the total size of expired data files.
+   *
+   * @param size total size in bytes
+   */
+  public void setExpiredDataFileSize(long size) {
+    putSummary(EXPIRED_DATA_FILE_SIZE, String.valueOf(size));
+  }
+
+  /**
+   * Get the number of expired snapshots.
+   *
+   * @return number of snapshots expired, or 0 if not set
+   */
+  public int getExpiredSnapshotCount() {
+    String value = summary().get(EXPIRED_SNAPSHOT_COUNT);
+    return value == null ? 0 : Integer.parseInt(value);
+  }
+
+  /**
+   * Get the number of expired data files.
+   *
+   * @return number of data files expired, or 0 if not set
+   */
+  public int getExpiredDataFileCount() {
+    String value = summary().get(EXPIRED_DATA_FILE_COUNT);
+    return value == null ? 0 : Integer.parseInt(value);
+  }
+
+  /**
+   * Get the total size of expired data files.
+   *
+   * @return total size in bytes, or 0 if not set
+   */
+  public long getExpiredDataFileSize() {
+    String value = summary().get(EXPIRED_DATA_FILE_SIZE);
+    return value == null ? 0 : Long.parseLong(value);
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java
new file mode 100644
index 000000000..c85d0435b
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark.maintainer;
+
+import org.apache.amoro.maintainer.MaintainerExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executor for Paimon snapshot expiration.
+ *
+ * <p>This executor handles the expiration of old snapshots in Paimon tables 
based on time and count
+ * thresholds.
+ */
+public class PaimonSnapshotExpireExecutor
+    implements MaintainerExecutor<PaimonSnapshotExpireInput, 
PaimonMaintainerOutput> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PaimonSnapshotExpireExecutor.class);
+
+  private final PaimonSnapshotExpireInput input;
+
+  public PaimonSnapshotExpireExecutor(PaimonSnapshotExpireInput input) {
+    this.input = input;
+  }
+
+  @Override
+  public PaimonMaintainerOutput execute() {
+    try {
+      LOG.info(
+          "Starting snapshot expiration for table {}, older than {}, retain 
last {}",
+          input.getTableIdentifier(),
+          input.getOlderThanMillis(),
+          input.getRetainLastCount());
+
+      // TODO: Implement Paimon snapshot expiration logic
+      // 1. Load Paimon table from options (table location, database, name)
+      // 2. Collect snapshots to expire based on time and count
+      // 3. Call Paimon's snapshot expiration API
+      // 4. Collect statistics (snapshot count, file count, file size)
+      // 5. Return output with statistics
+
+      // Placeholder implementation - returns success with no expired snapshots
+      LOG.info(
+          "Snapshot expiration for table {} completed (placeholder 
implementation)",
+          input.getTableIdentifier());
+
+      PaimonMaintainerOutput output = new PaimonMaintainerOutput();
+      output.setExpiredSnapshotCount(0);
+      output.setExpiredDataFileCount(0);
+      output.setExpiredDataFileSize(0);
+      return output;
+
+    } catch (Throwable t) {
+      LOG.error("Failed to expire snapshots for table {}", 
input.getTableIdentifier(), t);
+      return new PaimonMaintainerOutput(false, t.getMessage());
+    }
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java
new file mode 100644
index 000000000..fff23804f
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark.maintainer;
+
+import org.apache.amoro.maintainer.MaintainerExecutor;
+import org.apache.amoro.maintainer.MaintainerExecutorFactory;
+
+import java.util.Map;
+
+/** Factory for creating PaimonSnapshotExpireExecutor instances. */
+public class PaimonSnapshotExpireFactory
+    implements MaintainerExecutorFactory<PaimonSnapshotExpireInput> {
+
+  private static final long serialVersionUID = 1L;
+
+  private Map<String, String> properties;
+
+  /** Default constructor required for DynConstructors. */
+  public PaimonSnapshotExpireFactory() {
+    // Required for DynConstructors
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public MaintainerExecutor<?, ?> createExecutor(PaimonSnapshotExpireInput 
input) {
+    return new PaimonSnapshotExpireExecutor(input);
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java
new file mode 100644
index 000000000..7db5fcbae
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark.maintainer;
+
+import org.apache.amoro.maintainer.BaseMaintainerInput;
+
+/** Input for Paimon snapshot expiration operation. */
+public class PaimonSnapshotExpireInput extends BaseMaintainerInput {
+
+  private static final long serialVersionUID = 1L;
+
+  private final String tableIdentifier;
+  private final String tableFormat;
+  private final long olderThanMillis;
+  private final int retainLastCount;
+
+  // Table metadata will be serialized and passed through options
+  public static final String TABLE_LOCATION = "table.location";
+  public static final String TABLE_DATABASE = "table.database";
+  public static final String TABLE_NAME = "table.name";
+
+  /**
+   * Create a Paimon snapshot expiration input.
+   *
+   * @param tableIdentifier the full table identifier
+   * @param tableFormat the table format (e.g., "PAIMON")
+   * @param olderThanMillis expire snapshots older than this timestamp 
(milliseconds)
+   * @param retainLastCount retain at least this many snapshots
+   */
+  public PaimonSnapshotExpireInput(
+      String tableIdentifier, String tableFormat, long olderThanMillis, int 
retainLastCount) {
+    this.tableIdentifier = tableIdentifier;
+    this.tableFormat = tableFormat;
+    this.olderThanMillis = olderThanMillis;
+    this.retainLastCount = retainLastCount;
+  }
+
+  @Override
+  public OperationType getOperationType() {
+    return OperationType.SNAPSHOT_EXPIRATION;
+  }
+
+  @Override
+  public String getTableIdentifier() {
+    return tableIdentifier;
+  }
+
+  @Override
+  public String getTableFormat() {
+    return tableFormat;
+  }
+
+  /**
+   * Get the expiration threshold in milliseconds.
+   *
+   * @return snapshots older than this timestamp should be expired
+   */
+  public long getOlderThanMillis() {
+    return olderThanMillis;
+  }
+
+  /**
+   * Get the minimum number of snapshots to retain.
+   *
+   * @return minimum snapshots to keep
+   */
+  public int getRetainLastCount() {
+    return retainLastCount;
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java
 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java
new file mode 100644
index 000000000..e2badb4b8
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.paimon.spark.maintainer;
+
+import org.apache.amoro.api.OptimizingTask;
+import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.maintainer.MaintainerExecutor;
+import org.apache.amoro.maintainer.MaintainerExecutorFactory;
+import org.apache.amoro.maintainer.MaintainerInput;
+import org.apache.amoro.maintainer.MaintainerOutput;
+import org.apache.amoro.optimizer.common.OptimizerConfig;
+import org.apache.amoro.optimizing.TaskProperties;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.utils.ExceptionUtil;
+import org.apache.amoro.utils.SerializationUtil;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.spark.api.java.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * Spark function to execute maintainer tasks. Similar to 
SparkOptimizingTaskFunction but for
+ * maintainer operations.
+ */
+public class SparkMaintainerTaskFunction implements Function<OptimizingTask, 
OptimizingTaskResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMaintainerTaskFunction.class);
+  private static final int ERROR_MESSAGE_MAX_LENGTH = 4000;
+
+  private final OptimizerConfig config;
+  private final int threadId;
+
+  public SparkMaintainerTaskFunction(OptimizerConfig config, int threadId) {
+    this.config = config;
+    this.threadId = threadId;
+  }
+
+  @Override
+  public OptimizingTaskResult call(OptimizingTask task) throws Exception {
+    long startTime = System.currentTimeMillis();
+    MaintainerInput input;
+    try {
+      Map<String, String> taskProperties = fillTaskProperties(config, task);
+      input = SerializationUtil.simpleDeserialize(task.getTaskInput());
+
+      String factoryImpl = 
taskProperties.get(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL);
+      DynConstructors.Ctor<MaintainerExecutorFactory> ctor =
+          
DynConstructors.builder(MaintainerExecutorFactory.class).impl(factoryImpl).buildChecked();
+      MaintainerExecutorFactory factory = ctor.newInstance();
+
+      factory.initialize(taskProperties);
+      MaintainerExecutor executor = factory.createExecutor(input);
+      MaintainerOutput output = (MaintainerOutput) executor.execute();
+
+      ByteBuffer outputByteBuffer = SerializationUtil.simpleSerialize(output);
+      OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), 
threadId);
+      result.setTaskOutput(outputByteBuffer);
+      result.setSummary(output.summary());
+
+      LOG.info(
+          "Maintainer executor[{}] executed task[{}]({}) and cost {} ms",
+          threadId,
+          task.getTaskId(),
+          input,
+          System.currentTimeMillis() - startTime);
+      return result;
+    } catch (Throwable t) {
+      LOG.error(
+          "Maintainer executor[{}] executed task[{}] failed and cost {} ms",
+          threadId,
+          task.getTaskId(),
+          System.currentTimeMillis() - startTime,
+          t);
+      OptimizingTaskResult errorResult = new 
OptimizingTaskResult(task.getTaskId(), threadId);
+      errorResult.setErrorMessage(ExceptionUtil.getErrorMessage(t, 
ERROR_MESSAGE_MAX_LENGTH));
+      return errorResult;
+    }
+  }
+
+  private static Map<String, String> fillTaskProperties(
+      OptimizerConfig config, OptimizingTask task) {
+    Map<String, String> properties = Maps.newHashMap(task.getProperties());
+    properties.put(TaskProperties.PROCESS_ID, 
String.valueOf(task.getTaskId().getProcessId()));
+    return properties;
+  }
+}
diff --git a/amoro-optimizer/pom.xml b/amoro-optimizer/pom.xml
index 5749b1361..21e9f7271 100644
--- a/amoro-optimizer/pom.xml
+++ b/amoro-optimizer/pom.xml
@@ -35,6 +35,7 @@
         <module>amoro-optimizer-common</module>
         <module>amoro-optimizer-flink</module>
         <module>amoro-optimizer-spark</module>
+        <module>amoro-optimizer-paimon-spark</module>
         <module>amoro-optimizer-standalone</module>
     </modules>
 

Reply via email to