This is an automated email from the ASF dual-hosted git repository. czy006 pushed a commit to branch czy006/paimon-maintainer-model in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 1b7141a7ec4f250f0047772ccb87c0e9c5e5c190 Author: ConradJam <[email protected]> AuthorDate: Thu Jan 29 14:11:20 2026 +0800 [Feature] Add remote maintainer framework for Paimon tables This commit introduces a framework for executing Paimon table maintenance operations (snapshot expiration, orphan file cleanup) remotely on Spark optimizers, following the existing Optimizer pattern. Changes: - Add MaintainerInput/Output interfaces and base implementations - Add MaintainerExecutor/Factory interfaces for remote execution - Create amoro-optimizer-paimon-spark module with SparkMaintainerExecutor - Implement PaimonSnapshotExpire* components for snapshot expiration - Add placeholder SparkOptimizer for future Paimon optimizing support Co-Authored-By: Claude (glm-4.7) <[email protected]> --- .../amoro/maintainer/BaseMaintainerInput.java | 57 ++++++++ .../amoro/maintainer/BaseMaintainerOutput.java | 75 +++++++++++ .../amoro/maintainer/MaintainerExecutor.java | 38 ++++++ .../maintainer/MaintainerExecutorFactory.java | 47 +++++++ .../apache/amoro/maintainer/MaintainerInput.java | 81 +++++++++++ .../apache/amoro/maintainer/MaintainerOutput.java | 50 +++++++ .../amoro-optimizer-paimon-spark/pom.xml | 148 +++++++++++++++++++++ .../paimon/spark/SparkMaintainerExecutor.java | 88 ++++++++++++ .../optimizer/paimon/spark/SparkOptimizer.java | 75 +++++++++++ .../paimon/spark/SparkOptimizerExecutor.java | 56 ++++++++ .../spark/maintainer/PaimonMaintainerOutput.java | 108 +++++++++++++++ .../maintainer/PaimonSnapshotExpireExecutor.java | 74 +++++++++++ .../maintainer/PaimonSnapshotExpireFactory.java | 48 +++++++ .../maintainer/PaimonSnapshotExpireInput.java | 86 ++++++++++++ .../maintainer/SparkMaintainerTaskFunction.java | 104 +++++++++++++++ amoro-optimizer/pom.xml | 1 + 16 files changed, 1136 insertions(+) diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java new file mode 100644 index 000000000..c642c8901 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerInput.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; + +import java.util.HashMap; +import java.util.Map; + +/** Base implementation of MaintainerInput following BaseOptimizingInput pattern. */ +public abstract class BaseMaintainerInput implements MaintainerInput { + + private static final long serialVersionUID = 1L; + + private final Map<String, String> options = new HashMap<>(); + + @Override + public void option(String name, String value) { + options.put(name, value); + } + + @Override + public void options(Map<String, String> options) { + this.options.putAll(options); + } + + @Override + public Map<String, String> getOptions() { + return options; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("operationType", getOperationType()) + .add("tableIdentifier", getTableIdentifier()) + .add("tableFormat", getTableFormat()) + .add("options", options) + .toString(); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java new file mode 100644 index 000000000..3d5658bf7 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/BaseMaintainerOutput.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Base implementation of MaintainerOutput. */ +public class BaseMaintainerOutput implements MaintainerOutput { + + private static final long serialVersionUID = 1L; + + private final Map<String, String> summary; + private final boolean success; + private final String errorMessage; + + /** Create a successful maintainer output. */ + public BaseMaintainerOutput() { + this(true, null); + } + + /** + * Create a maintainer output with specified status. + * + * @param success whether the operation succeeded + * @param errorMessage error message if failed, null otherwise + */ + public BaseMaintainerOutput(boolean success, String errorMessage) { + this.summary = new HashMap<>(); + this.success = success; + this.errorMessage = errorMessage; + } + + @Override + public Map<String, String> summary() { + return Collections.unmodifiableMap(summary); + } + + /** + * Add a summary entry. + * + * @param key summary key + * @param value summary value + */ + public void putSummary(String key, String value) { + summary.put(key, value); + } + + @Override + public boolean isSuccess() { + return success; + } + + @Override + public String getErrorMessage() { + return errorMessage; + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java new file mode 100644 index 000000000..02076b000 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; + +/** + * Executor interface for maintainer operations. Follows the same pattern as OptimizingExecutor. + * + * @param <I> the maintainer input type + * @param <O> the maintainer output type + */ +public interface MaintainerExecutor<I extends MaintainerInput, O extends MaintainerOutput> + extends Serializable { + + /** + * Execute the maintainer operation. + * + * @return the maintainer output with execution results + */ + O execute(); +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java new file mode 100644 index 000000000..1caf782f1 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerExecutorFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; +import java.util.Map; + +/** + * Factory interface for creating MaintainerExecutor instances. Follows the same pattern as + * OptimizingExecutorFactory. + * + * @param <I> the maintainer input type + */ +public interface MaintainerExecutorFactory<I extends MaintainerInput> extends Serializable { + + /** + * Initialize the factory with task properties. Called after constructing the factory through a + * parameterless constructor. + * + * @param properties the task properties + */ + void initialize(Map<String, String> properties); + + /** + * Create an executor from the given input. + * + * @param input the maintainer input + * @return the maintainer executor + */ + MaintainerExecutor<?, ?> createExecutor(I input); +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java new file mode 100644 index 000000000..c2c735db5 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerInput.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; +import java.util.Map; + +/** + * Input interface for maintainer operations executed remotely. Follows the same pattern as + * TableOptimizing.OptimizingInput. + */ +public interface MaintainerInput extends Serializable { + + /** Maintainer operation type */ + enum OperationType { + SNAPSHOT_EXPIRATION, + ORPHAN_FILE_CLEANING, + DANGLING_DELETE_CLEANING, + DATA_EXPIRATION, + TAG_CREATION + } + + /** + * Get the operation type for this maintainer task. + * + * @return the operation type + */ + OperationType getOperationType(); + + /** + * Get the table identifier. + * + * @return table identifier string + */ + String getTableIdentifier(); + + /** + * Get the table format (ICEBERG, PAIMON, etc.). + * + * @return table format string + */ + String getTableFormat(); + + /** + * Set an option for this maintainer operation. + * + * @param name option name + * @param value option value + */ + void option(String name, String value); + + /** + * Set multiple options for this maintainer operation. + * + * @param options map of option names to values + */ + void options(Map<String, String> options); + + /** + * Get all options for this maintainer operation. + * + * @return map of option names to values + */ + Map<String, String> getOptions(); +} diff --git a/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java new file mode 100644 index 000000000..322aa44b9 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/maintainer/MaintainerOutput.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.maintainer; + +import java.io.Serializable; +import java.util.Map; + +/** + * Output interface for maintainer operations executed remotely. Follows the same pattern as + * TableOptimizing.OptimizingOutput. + */ +public interface MaintainerOutput extends Serializable { + + /** + * Get a summary of the maintainer operation execution. + * + * @return map containing summary information about the operation + */ + Map<String, String> summary(); + + /** + * Check if the maintainer operation completed successfully. + * + * @return true if operation succeeded, false otherwise + */ + boolean isSuccess(); + + /** + * Get the error message if the operation failed. + * + * @return error message, or null if operation succeeded + */ + String getErrorMessage(); +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml b/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml new file mode 100644 index 000000000..66ba56606 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/pom.xml @@ -0,0 +1,148 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.amoro</groupId> + <artifactId>amoro-optimizer</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>amoro-optimizer-paimon-spark-${spark.major.version}_${scala.binary.version}</artifactId> + <name>Amoro Project Paimon Spark Optimizer</name> + <url>https://amoro.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.apache.amoro</groupId> + <artifactId>amoro-optimizer-common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Paimon format support --> + <dependency> + <groupId>org.apache.amoro</groupId> + <artifactId>amoro-format-paimon</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- spark dependencies begin --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Paimon dependencies --> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-spark-${spark.major.version}</artifactId> + <version>${paimon.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-amoro</id> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> + <artifactSet> + <excludes> + <exclude>org.slf4j:slf4j-api</exclude> + <exclude>org.apache.hadoop:*</exclude> + <exclude>org.apache.hive:*</exclude> + </excludes> + </artifactSet> + <relocations> + <relocation> + <pattern>org.apache.paimon</pattern> + <shadedPattern>org.apache.amoro.shade.org.apache.paimon</shadedPattern> + </relocation> + </relocations> + <finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java new file mode 100644 index 000000000..eca36c97f --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkMaintainerExecutor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark; + +import org.apache.amoro.api.OptimizingTask; +import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.maintainer.MaintainerInput; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizer.common.OptimizerExecutor; +import org.apache.amoro.optimizer.paimon.spark.maintainer.SparkMaintainerTaskFunction; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; +import org.apache.amoro.utils.ExceptionUtil; +import org.apache.amoro.utils.SerializationUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Executor for maintainer tasks in Spark environment. Similar to SparkOptimizerExecutor but handles + * maintainer operations. + */ +public class SparkMaintainerExecutor extends OptimizerExecutor { + private static final Logger LOG = LoggerFactory.getLogger(SparkMaintainerExecutor.class); + private final JavaSparkContext jsc; + private final int threadId; + + public SparkMaintainerExecutor(JavaSparkContext jsc, OptimizerConfig config, int threadId) { + super(config, threadId); + this.jsc = jsc; + this.threadId = threadId; + } + + @Override + protected OptimizingTaskResult executeTask(OptimizingTask task) { + OptimizingTaskResult result; + String threadName = Thread.currentThread().getName(); + long startTime = System.currentTimeMillis(); + try { + ImmutableList<OptimizingTask> of = ImmutableList.of(task); + jsc.setJobDescription(jobDescription(task)); + SparkMaintainerTaskFunction taskFunction = + new SparkMaintainerTaskFunction(getConfig(), threadId); + List<OptimizingTaskResult> results = jsc.parallelize(of, 1).map(taskFunction).collect(); + result = results.get(0); + LOG.info( + "Maintainer executor[{}] executed task[{}] and cost {} ms", + threadName, + task.getTaskId(), + System.currentTimeMillis() - startTime); + return result; + } catch (Throwable r) { + LOG.error( + "Maintainer executor[{}] executed task[{}] failed, and cost {} ms", + threadName, + task.getTaskId(), + (System.currentTimeMillis() - startTime), + r); + result = new OptimizingTaskResult(task.getTaskId(), threadId); + result.setErrorMessage(ExceptionUtil.getErrorMessage(r, ERROR_MESSAGE_MAX_LENGTH)); + return result; + } + } + + private String jobDescription(OptimizingTask task) { + MaintainerInput input = SerializationUtil.simpleDeserialize(task.getTaskInput()); + return String.format( + "Amoro Paimon maintainer task, operation: %s, table: %s, task id: %s", + input.getOperationType(), input.getTableIdentifier(), task.getTaskId()); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java new file mode 100644 index 000000000..5528fb641 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark; + +import org.apache.amoro.optimizer.common.Optimizer; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizer.common.OptimizerToucher; +import org.apache.amoro.resource.Resource; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main entry point for Paimon Spark optimizer. Supports both optimizing and maintainer tasks. + * + * <p>Note: Optimizer functionality is currently not implemented. This class serves as a placeholder + * for future Paimon optimizing implementation. + */ +public class SparkOptimizer extends Optimizer { + private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizer.class); + private static final String APP_NAME_FORMAT = "amoro-paimon-spark-optimizer-%s"; + + public SparkOptimizer(OptimizerConfig config, JavaSparkContext jsc) { + super(config, () -> new OptimizerToucher(config), (i) -> new SparkOptimizerExecutor(config, i)); + } + + public static void main(String[] args) throws Exception { + OptimizerConfig config = new OptimizerConfig(args); + SparkSession spark = + SparkSession.builder() + .appName(String.format(APP_NAME_FORMAT, config.getResourceId())) + .getOrCreate(); + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + + if (!jsc.getConf().getBoolean("spark.dynamicAllocation.enabled", false)) { + LOG.warn( + "To better utilize computing resources, it is recommended to enable " + + "'spark.dynamicAllocation.enabled' " + + "and set 'spark.dynamicAllocation.maxExecutors' equal to 'OPTIMIZER_EXECUTION_PARALLEL'"); + } + + // Calculate optimizer memory allocation + int driverMemory = Utils.memoryStringToMb(jsc.getConf().get("spark.driver.memory", "1g")); + int executorMemory = Utils.memoryStringToMb(jsc.getConf().get("spark.executor.memory", "1g")); + int executorCores = jsc.getConf().getInt("spark.executor.cores", 1); + int executionParallel = config.getExecutionParallel(); + int executorNum = (int) Math.ceil((double) executionParallel / executorCores); + config.setMemorySize(driverMemory + executorNum * executorMemory); + + SparkOptimizer optimizer = new SparkOptimizer(config, jsc); + OptimizerToucher toucher = optimizer.getToucher(); + toucher.withRegisterProperty(Resource.PROPERTY_JOB_ID, spark.sparkContext().applicationId()); + + LOG.info("Starting the Paimon Spark optimizer with configuration:{}", config); + optimizer.startOptimizing(); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java new file mode 100644 index 000000000..2d7f6a183 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/SparkOptimizerExecutor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark; + +import org.apache.amoro.api.OptimizingTask; +import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizer.common.OptimizerExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executor for Paimon optimizing tasks in Spark environment. + * + * <p>This is a placeholder implementation for future Paimon optimizing functionality. Currently, + * all optimizing operations will throw an UnsupportedOperationException. + * + * <p>For maintainer operations, use {@link SparkMaintainerExecutor} instead. + */ +public class SparkOptimizerExecutor extends OptimizerExecutor { + private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizerExecutor.class); + + public SparkOptimizerExecutor(OptimizerConfig config, int threadId) { + super(config, threadId); + } + + @Override + protected OptimizingTaskResult executeTask(OptimizingTask task) { + String errorMessage = + "Paimon optimizing is not yet implemented. " + + "For maintainer operations, please use SparkMaintainerExecutor instead."; + LOG.error( + "Optimizer executor[{}] encountered unsupported operation: {}", + getThreadId(), + errorMessage); + OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), getThreadId()); + result.setErrorMessage(errorMessage); + return result; + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java new file mode 100644 index 000000000..b7cae6c94 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonMaintainerOutput.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.BaseMaintainerOutput; + +/** Output for Paimon snapshot expiration operation. */ +public class PaimonMaintainerOutput extends BaseMaintainerOutput { + + private static final long serialVersionUID = 1L; + + /** Summary key for expired snapshot count */ + public static final String EXPIRED_SNAPSHOT_COUNT = "expired_snapshot_count"; + + /** Summary key for expired data file count */ + public static final String EXPIRED_DATA_FILE_COUNT = "expired_data_file_count"; + + /** Summary key for expired data file size */ + public static final String EXPIRED_DATA_FILE_SIZE = "expired_data_file_size"; + + /** Create a successful maintainer output. */ + public PaimonMaintainerOutput() { + super(true, null); + } + + /** + * Create a maintainer output with specified status. + * + * @param success whether the operation succeeded + * @param errorMessage error message if failed, null otherwise + */ + public PaimonMaintainerOutput(boolean success, String errorMessage) { + super(success, errorMessage); + } + + /** + * Set the number of expired snapshots. + * + * @param count number of snapshots expired + */ + public void setExpiredSnapshotCount(int count) { + putSummary(EXPIRED_SNAPSHOT_COUNT, String.valueOf(count)); + } + + /** + * Set the number of expired data files. + * + * @param count number of data files expired + */ + public void setExpiredDataFileCount(int count) { + putSummary(EXPIRED_DATA_FILE_COUNT, String.valueOf(count)); + } + + /** + * Set the total size of expired data files. + * + * @param size total size in bytes + */ + public void setExpiredDataFileSize(long size) { + putSummary(EXPIRED_DATA_FILE_SIZE, String.valueOf(size)); + } + + /** + * Get the number of expired snapshots. + * + * @return number of snapshots expired, or 0 if not set + */ + public int getExpiredSnapshotCount() { + String value = summary().get(EXPIRED_SNAPSHOT_COUNT); + return value == null ? 0 : Integer.parseInt(value); + } + + /** + * Get the number of expired data files. + * + * @return number of data files expired, or 0 if not set + */ + public int getExpiredDataFileCount() { + String value = summary().get(EXPIRED_DATA_FILE_COUNT); + return value == null ? 0 : Integer.parseInt(value); + } + + /** + * Get the total size of expired data files. + * + * @return total size in bytes, or 0 if not set + */ + public long getExpiredDataFileSize() { + String value = summary().get(EXPIRED_DATA_FILE_SIZE); + return value == null ? 0 : Long.parseLong(value); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java new file mode 100644 index 000000000..c85d0435b --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireExecutor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.MaintainerExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Executor for Paimon snapshot expiration. + * + * <p>This executor handles the expiration of old snapshots in Paimon tables based on time and count + * thresholds. + */ +public class PaimonSnapshotExpireExecutor + implements MaintainerExecutor<PaimonSnapshotExpireInput, PaimonMaintainerOutput> { + + private static final Logger LOG = LoggerFactory.getLogger(PaimonSnapshotExpireExecutor.class); + + private final PaimonSnapshotExpireInput input; + + public PaimonSnapshotExpireExecutor(PaimonSnapshotExpireInput input) { + this.input = input; + } + + @Override + public PaimonMaintainerOutput execute() { + try { + LOG.info( + "Starting snapshot expiration for table {}, older than {}, retain last {}", + input.getTableIdentifier(), + input.getOlderThanMillis(), + input.getRetainLastCount()); + + // TODO: Implement Paimon snapshot expiration logic + // 1. Load Paimon table from options (table location, database, name) + // 2. Collect snapshots to expire based on time and count + // 3. Call Paimon's snapshot expiration API + // 4. Collect statistics (snapshot count, file count, file size) + // 5. Return output with statistics + + // Placeholder implementation - returns success with no expired snapshots + LOG.info( + "Snapshot expiration for table {} completed (placeholder implementation)", + input.getTableIdentifier()); + + PaimonMaintainerOutput output = new PaimonMaintainerOutput(); + output.setExpiredSnapshotCount(0); + output.setExpiredDataFileCount(0); + output.setExpiredDataFileSize(0); + return output; + + } catch (Throwable t) { + LOG.error("Failed to expire snapshots for table {}", input.getTableIdentifier(), t); + return new PaimonMaintainerOutput(false, t.getMessage()); + } + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java new file mode 100644 index 000000000..fff23804f --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.MaintainerExecutor; +import org.apache.amoro.maintainer.MaintainerExecutorFactory; + +import java.util.Map; + +/** Factory for creating PaimonSnapshotExpireExecutor instances. */ +public class PaimonSnapshotExpireFactory + implements MaintainerExecutorFactory<PaimonSnapshotExpireInput> { + + private static final long serialVersionUID = 1L; + + private Map<String, String> properties; + + /** Default constructor required for DynConstructors. */ + public PaimonSnapshotExpireFactory() { + // Required for DynConstructors + } + + @Override + public void initialize(Map<String, String> properties) { + this.properties = properties; + } + + @Override + public MaintainerExecutor<?, ?> createExecutor(PaimonSnapshotExpireInput input) { + return new PaimonSnapshotExpireExecutor(input); + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java new file mode 100644 index 000000000..7db5fcbae --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/PaimonSnapshotExpireInput.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.maintainer.BaseMaintainerInput; + +/** Input for Paimon snapshot expiration operation. */ +public class PaimonSnapshotExpireInput extends BaseMaintainerInput { + + private static final long serialVersionUID = 1L; + + private final String tableIdentifier; + private final String tableFormat; + private final long olderThanMillis; + private final int retainLastCount; + + // Table metadata will be serialized and passed through options + public static final String TABLE_LOCATION = "table.location"; + public static final String TABLE_DATABASE = "table.database"; + public static final String TABLE_NAME = "table.name"; + + /** + * Create a Paimon snapshot expiration input. + * + * @param tableIdentifier the full table identifier + * @param tableFormat the table format (e.g., "PAIMON") + * @param olderThanMillis expire snapshots older than this timestamp (milliseconds) + * @param retainLastCount retain at least this many snapshots + */ + public PaimonSnapshotExpireInput( + String tableIdentifier, String tableFormat, long olderThanMillis, int retainLastCount) { + this.tableIdentifier = tableIdentifier; + this.tableFormat = tableFormat; + this.olderThanMillis = olderThanMillis; + this.retainLastCount = retainLastCount; + } + + @Override + public OperationType getOperationType() { + return OperationType.SNAPSHOT_EXPIRATION; + } + + @Override + public String getTableIdentifier() { + return tableIdentifier; + } + + @Override + public String getTableFormat() { + return tableFormat; + } + + /** + * Get the expiration threshold in milliseconds. + * + * @return snapshots older than this timestamp should be expired + */ + public long getOlderThanMillis() { + return olderThanMillis; + } + + /** + * Get the minimum number of snapshots to retain. + * + * @return minimum snapshots to keep + */ + public int getRetainLastCount() { + return retainLastCount; + } +} diff --git a/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java new file mode 100644 index 000000000..e2badb4b8 --- /dev/null +++ b/amoro-optimizer/amoro-optimizer-paimon-spark/src/main/java/org/apache/amoro/optimizer/paimon/spark/maintainer/SparkMaintainerTaskFunction.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizer.paimon.spark.maintainer; + +import org.apache.amoro.api.OptimizingTask; +import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.maintainer.MaintainerExecutor; +import org.apache.amoro.maintainer.MaintainerExecutorFactory; +import org.apache.amoro.maintainer.MaintainerInput; +import org.apache.amoro.maintainer.MaintainerOutput; +import org.apache.amoro.optimizer.common.OptimizerConfig; +import org.apache.amoro.optimizing.TaskProperties; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.utils.ExceptionUtil; +import org.apache.amoro.utils.SerializationUtil; +import org.apache.iceberg.common.DynConstructors; +import org.apache.spark.api.java.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * Spark function to execute maintainer tasks. Similar to SparkOptimizingTaskFunction but for + * maintainer operations. + */ +public class SparkMaintainerTaskFunction implements Function<OptimizingTask, OptimizingTaskResult> { + private static final Logger LOG = LoggerFactory.getLogger(SparkMaintainerTaskFunction.class); + private static final int ERROR_MESSAGE_MAX_LENGTH = 4000; + + private final OptimizerConfig config; + private final int threadId; + + public SparkMaintainerTaskFunction(OptimizerConfig config, int threadId) { + this.config = config; + this.threadId = threadId; + } + + @Override + public OptimizingTaskResult call(OptimizingTask task) throws Exception { + long startTime = System.currentTimeMillis(); + MaintainerInput input; + try { + Map<String, String> taskProperties = fillTaskProperties(config, task); + input = SerializationUtil.simpleDeserialize(task.getTaskInput()); + + String factoryImpl = taskProperties.get(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL); + DynConstructors.Ctor<MaintainerExecutorFactory> ctor = + DynConstructors.builder(MaintainerExecutorFactory.class).impl(factoryImpl).buildChecked(); + MaintainerExecutorFactory factory = ctor.newInstance(); + + factory.initialize(taskProperties); + MaintainerExecutor executor = factory.createExecutor(input); + MaintainerOutput output = (MaintainerOutput) executor.execute(); + + ByteBuffer outputByteBuffer = SerializationUtil.simpleSerialize(output); + OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), threadId); + result.setTaskOutput(outputByteBuffer); + result.setSummary(output.summary()); + + LOG.info( + "Maintainer executor[{}] executed task[{}]({}) and cost {} ms", + threadId, + task.getTaskId(), + input, + System.currentTimeMillis() - startTime); + return result; + } catch (Throwable t) { + LOG.error( + "Maintainer executor[{}] executed task[{}] failed and cost {} ms", + threadId, + task.getTaskId(), + System.currentTimeMillis() - startTime, + t); + OptimizingTaskResult errorResult = new OptimizingTaskResult(task.getTaskId(), threadId); + errorResult.setErrorMessage(ExceptionUtil.getErrorMessage(t, ERROR_MESSAGE_MAX_LENGTH)); + return errorResult; + } + } + + private static Map<String, String> fillTaskProperties( + OptimizerConfig config, OptimizingTask task) { + Map<String, String> properties = Maps.newHashMap(task.getProperties()); + properties.put(TaskProperties.PROCESS_ID, String.valueOf(task.getTaskId().getProcessId())); + return properties; + } +} diff --git a/amoro-optimizer/pom.xml b/amoro-optimizer/pom.xml index 5749b1361..21e9f7271 100644 --- a/amoro-optimizer/pom.xml +++ b/amoro-optimizer/pom.xml @@ -35,6 +35,7 @@ <module>amoro-optimizer-common</module> <module>amoro-optimizer-flink</module> <module>amoro-optimizer-spark</module> + <module>amoro-optimizer-paimon-spark</module> <module>amoro-optimizer-standalone</module> </modules>
