This is an automated email from the ASF dual-hosted git repository. czy006 pushed a commit to branch feature-tablemerge in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 12c21d37d03dce7f6353328393721b1427d6e676 Author: ConradJam <[email protected]> AuthorDate: Thu Feb 5 17:19:59 2026 +0800 [AMORO-3951] Complete table merge implementation Implement missing Action2StringConverter and add multi-lake support: - Add Action2StringConverter TypeHandler for Action <-> String conversion - Add PaimonActions class for Paimon table format operations - Mark ProcessStateMapper as @Deprecated - Add unit tests for Action2StringConverter Co-Authored-By: Claude (glm-4.7) <[email protected]> --- .../converter/Action2StringConverter.java | 174 +++++++++++++++++++++ .../persistence/mapper/ProcessStateMapper.java | 44 +++++- .../converter/Action2StringConverterTest.java | 130 +++++++++++++++ .../main/java/org/apache/amoro/PaimonActions.java | 41 +++++ 4 files changed, 388 insertions(+), 1 deletion(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java new file mode 100644 index 000000000..5ac935825 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.Action; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.PaimonActions; +import org.apache.amoro.TableFormat; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedJdbcTypes; +import org.apache.ibatis.type.MappedTypes; +import org.apache.ibatis.type.TypeHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * MyBatis TypeHandler for converting Action to/from String in database. This converter maintains a + * registry of known actions and can dynamically create temporary actions for unknown names to + * support backward compatibility. + */ +@MappedTypes(Action.class) +@MappedJdbcTypes(JdbcType.VARCHAR) +public class Action2StringConverter implements TypeHandler<Action> { + + private static final Logger LOG = LoggerFactory.getLogger(Action2StringConverter.class); + + /** Registry of all registered actions, keyed by action name. */ + private static final Map<String, Action> ACTION_REGISTRY = new ConcurrentHashMap<>(); + + /** Default formats for dynamically created actions. */ + private static final TableFormat[] DEFAULT_FORMATS = + new TableFormat[] { + TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.PAIMON + }; + + /** Static initialization block to register all built-in Iceberg and Paimon actions. */ + static { + // Register Iceberg actions + registerAction(IcebergActions.SYSTEM); + registerAction(IcebergActions.REWRITE); + registerAction(IcebergActions.DELETE_ORPHANS); + registerAction(IcebergActions.SYNC_HIVE); + registerAction(IcebergActions.EXPIRE_DATA); + registerAction(IcebergActions.OPTIMIZING_MINOR); + registerAction(IcebergActions.OPTIMIZING_MAJOR); + registerAction(IcebergActions.OPTIMIZING_FULL); + + // Register Paimon actions + registerAction(PaimonActions.COMPACT); + registerAction(PaimonActions.FULL_COMPACT); + registerAction(PaimonActions.CLEAN_METADATA); + registerAction(PaimonActions.DELETE_SNAPSHOTS); + } + + /** + * Register an action in the registry. + * + * @param action the action to register + */ + public static void registerAction(Action action) { + if (action != null && action.getName() != null) { + ACTION_REGISTRY.put(action.getName(), action); + } + } + + /** + * Register a custom action. This is a convenience method that delegates to {@link + * #registerAction(Action)}. + * + * @param action the custom action to register + */ + public static void registerCustomAction(Action action) { + registerAction(action); + } + + /** + * Get an action by its name from the registry. + * + * @param name the action name to look up + * @return the registered action, or null if not found and name is null/empty + */ + public static Action getActionByName(String name) { + if (name == null || name.isEmpty()) { + return null; + } + return ACTION_REGISTRY.get(name); + } + + /** + * Get all registered actions. + * + * @return array of all registered actions + */ + public static Action[] getRegisteredActions() { + return ACTION_REGISTRY.values().toArray(new Action[0]); + } + + @Override + public void setParameter(PreparedStatement ps, int i, Action parameter, JdbcType jdbcType) + throws SQLException { + if (parameter == null) { + ps.setString(i, ""); + } else { + ps.setString(i, parameter.getName()); + } + } + + @Override + public Action getResult(ResultSet rs, String columnName) throws SQLException { + String actionName = rs.getString(columnName); + return convertToAction(actionName); + } + + @Override + public Action getResult(ResultSet rs, int columnIndex) throws SQLException { + String actionName = rs.getString(columnIndex); + return convertToAction(actionName); + } + + @Override + public Action getResult(CallableStatement cs, int columnIndex) throws SQLException { + String actionName = cs.getString(columnIndex); + return convertToAction(actionName); + } + + /** + * Convert a string action name to an Action object. First attempts to find the action in the + * registry. If not found, creates a temporary action with the given name for backward + * compatibility. + * + * @param actionName the action name to convert + * @return the corresponding Action object, or null if actionName is null/empty + */ + private Action convertToAction(String actionName) { + if (actionName == null || actionName.isEmpty()) { + return null; + } + + Action action = ACTION_REGISTRY.get(actionName); + if (action != null) { + return action; + } + + LOG.warn( + "Unknown action name '{}', creating temporary action for backward compatibility", + actionName); + Action tempAction = new Action(DEFAULT_FORMATS, 0, actionName); + registerAction(tempAction); + return tempAction; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java index f7b755781..e15f7f192 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java @@ -30,8 +30,21 @@ import org.apache.ibatis.annotations.Update; import java.util.Map; +/** + * Mapper for table_process_state table. + * + * @deprecated This mapper is deprecated as of AMORO-3951. Use {@link TableProcessMapper} instead. + * The table_process_state table has been merged into table_process. + */ +@Deprecated public interface ProcessStateMapper { + /** + * Create a new process state. + * + * @deprecated Use {@link TableProcessMapper#insertProcess} instead. + */ + @Deprecated @Insert( "INSERT INTO table_process_state " + "(process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary) " @@ -40,24 +53,48 @@ public interface ProcessStateMapper { @Options(useGeneratedKeys = true, keyProperty = "id") void createProcessState(TableProcessState state); + /** + * Update process state to running. + * + * @deprecated Use {@link TableProcessMapper#updateProcess} instead. + */ + @Deprecated @Update( "UPDATE table_process_state " + "SET status = #{status}, start_time = #{startTime} " + "WHERE process_id = #{id} and retry_num = #{retryNumber}") void updateProcessRunning(TableProcessState state); + /** + * Update process state to completed. + * + * @deprecated Use {@link TableProcessMapper#updateProcess} instead. + */ + @Deprecated @Update( "UPDATE table_process_state " + "SET status = #{status}, end_time = #{endTime} " + "WHERE process_id = #{id} and retry_num = #{retryNumber}") void updateProcessCompleted(TableProcessState state); + /** + * Update process state to failed. + * + * @deprecated Use {@link TableProcessMapper#updateProcess} instead. + */ + @Deprecated @Update( "UPDATE table_process_state " + "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} " + "WHERE process_id = #{id} and retry_num = #{retryNumber}") void updateProcessFailed(TableProcessState state); + /** + * Query TableProcessState by process_id. + * + * @deprecated Use {@link TableProcessMapper#getProcessMeta} instead. + */ + @Deprecated @Select( "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " + "FROM table_process_state " @@ -77,7 +114,12 @@ public interface ProcessStateMapper { }) TableProcessState getProcessStateById(@Param("processId") long processId); - /** Query TableProcessState by table_id */ + /** + * Query TableProcessState by table_id. + * + * @deprecated Use {@link TableProcessMapper#listProcessMeta} instead. + */ + @Deprecated @Select( "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " + "FROM table_process_state " diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java new file mode 100644 index 000000000..e8d660427 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.amoro.Action; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.PaimonActions; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link Action2StringConverter}. */ +public class Action2StringConverterTest { + + @Test + public void testGetRegisteredIcebergAction() { + Action action = Action2StringConverter.getActionByName("optimizing-minor"); + assertNotNull(action, "Should find registered optimizing-minor action"); + assertEquals("optimizing-minor", action.getName()); + assertSame(IcebergActions.OPTIMIZING_MINOR, action, "Should return the same instance"); + } + + @Test + public void testGetRegisteredIcebergActions() { + assertEquals(IcebergActions.SYSTEM, Action2StringConverter.getActionByName("system")); + assertEquals(IcebergActions.REWRITE, Action2StringConverter.getActionByName("rewrite")); + assertEquals( + IcebergActions.DELETE_ORPHANS, Action2StringConverter.getActionByName("delete-orphans")); + assertEquals(IcebergActions.SYNC_HIVE, Action2StringConverter.getActionByName("sync-hive")); + assertEquals(IcebergActions.EXPIRE_DATA, Action2StringConverter.getActionByName("expire-data")); + assertEquals( + IcebergActions.OPTIMIZING_MINOR, + Action2StringConverter.getActionByName("optimizing-minor")); + assertEquals( + IcebergActions.OPTIMIZING_MAJOR, + Action2StringConverter.getActionByName("optimizing-major")); + assertEquals( + IcebergActions.OPTIMIZING_FULL, Action2StringConverter.getActionByName("optimizing-full")); + } + + @Test + public void testGetRegisteredPaimonActions() { + assertEquals(PaimonActions.COMPACT, Action2StringConverter.getActionByName("compact")); + assertEquals( + PaimonActions.FULL_COMPACT, Action2StringConverter.getActionByName("full-compact")); + assertEquals( + PaimonActions.CLEAN_METADATA, Action2StringConverter.getActionByName("clean-meta")); + assertEquals( + PaimonActions.DELETE_SNAPSHOTS, Action2StringConverter.getActionByName("del-snapshots")); + } + + @Test + public void testGetUnknownActionCreatesTemporary() { + String unknownActionName = "custom-optimizing-action"; + Action action = Action2StringConverter.getActionByName(unknownActionName); + + assertNotNull(action, "Should create action for unknown name"); + assertEquals(unknownActionName, action.getName()); + } + + @Test + public void testGetUnknownActionReturnsSameInstance() { + String unknownActionName = "another-custom-action"; + Action action1 = Action2StringConverter.getActionByName(unknownActionName); + Action action2 = Action2StringConverter.getActionByName(unknownActionName); + + assertSame(action1, action2, "Should return the same instance for same unknown action name"); + } + + @Test + public void testGetNullAction() { + Action action = Action2StringConverter.getActionByName(null); + assertNull(action, "Should return null for null input"); + } + + @Test + public void testGetEmptyAction() { + Action action = Action2StringConverter.getActionByName(""); + assertNull(action, "Should return null for empty string"); + } + + @Test + public void testRegisterCustomAction() { + Action customAction = + new Action( + new org.apache.amoro.TableFormat[] {org.apache.amoro.TableFormat.PAIMON}, + 50, + "custom-action"); + + Action2StringConverter.registerCustomAction(customAction); + Action retrieved = Action2StringConverter.getActionByName("custom-action"); + + assertSame(customAction, retrieved, "Should retrieve the same custom action"); + } + + @Test + public void testGetRegisteredActions() { + Action[] actions = Action2StringConverter.getRegisteredActions(); + assertTrue(actions.length > 0, "Should have registered actions"); + + boolean hasOptimizingMinor = false; + for (Action action : actions) { + if ("optimizing-minor".equals(action.getName())) { + hasOptimizingMinor = true; + break; + } + } + assertTrue(hasOptimizingMinor, "Should include optimizing-minor action"); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java b/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java new file mode 100644 index 000000000..d15d05c66 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro; + +/** Pre-defined actions for Paimon table format operations. */ +public class PaimonActions { + + private static final TableFormat[] PAIMON_FORMATS = new TableFormat[] {TableFormat.PAIMON}; + + /** Minor compaction action for Paimon tables. */ + public static final Action COMPACT = new Action(PAIMON_FORMATS, 100, "compact"); + + /** Full compaction action for Paimon tables. */ + public static final Action FULL_COMPACT = new Action(PAIMON_FORMATS, 200, "full-compact"); + + /** Clean metadata action for removing expired snapshots. */ + public static final Action CLEAN_METADATA = new Action(PAIMON_FORMATS, 10, "clean-meta"); + + /** Delete snapshots action for Paimon tables. */ + public static final Action DELETE_SNAPSHOTS = new Action(PAIMON_FORMATS, 5, "del-snapshots"); + + private PaimonActions() { + // Private constructor to prevent instantiation + } +}
