This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch feature-tablemerge
in repository https://gitbox.apache.org/repos/asf/amoro.git

commit 12c21d37d03dce7f6353328393721b1427d6e676
Author: ConradJam <[email protected]>
AuthorDate: Thu Feb 5 17:19:59 2026 +0800

    [AMORO-3951] Complete table merge implementation
    
    Implement missing Action2StringConverter and add multi-lake support:
    
    - Add Action2StringConverter TypeHandler for Action <-> String conversion
    - Add PaimonActions class for Paimon table format operations
    - Mark ProcessStateMapper as @Deprecated
    - Add unit tests for Action2StringConverter
    
    Co-Authored-By: Claude (glm-4.7) <[email protected]>
---
 .../converter/Action2StringConverter.java          | 174 +++++++++++++++++++++
 .../persistence/mapper/ProcessStateMapper.java     |  44 +++++-
 .../converter/Action2StringConverterTest.java      | 130 +++++++++++++++
 .../main/java/org/apache/amoro/PaimonActions.java  |  41 +++++
 4 files changed, 388 insertions(+), 1 deletion(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java
new file mode 100644
index 000000000..5ac935825
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.converter;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.PaimonActions;
+import org.apache.amoro.TableFormat;
+import org.apache.ibatis.type.JdbcType;
+import org.apache.ibatis.type.MappedJdbcTypes;
+import org.apache.ibatis.type.MappedTypes;
+import org.apache.ibatis.type.TypeHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.CallableStatement;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * MyBatis TypeHandler for converting Action to/from String in database. This 
converter maintains a
+ * registry of known actions and can dynamically create temporary actions for 
unknown names to
+ * support backward compatibility.
+ */
+@MappedTypes(Action.class)
+@MappedJdbcTypes(JdbcType.VARCHAR)
+public class Action2StringConverter implements TypeHandler<Action> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Action2StringConverter.class);
+
+  /** Registry of all registered actions, keyed by action name. */
+  private static final Map<String, Action> ACTION_REGISTRY = new 
ConcurrentHashMap<>();
+
+  /** Default formats for dynamically created actions. */
+  private static final TableFormat[] DEFAULT_FORMATS =
+      new TableFormat[] {
+        TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, 
TableFormat.MIXED_HIVE, TableFormat.PAIMON
+      };
+
+  /** Static initialization block to register all built-in Iceberg and Paimon 
actions. */
+  static {
+    // Register Iceberg actions
+    registerAction(IcebergActions.SYSTEM);
+    registerAction(IcebergActions.REWRITE);
+    registerAction(IcebergActions.DELETE_ORPHANS);
+    registerAction(IcebergActions.SYNC_HIVE);
+    registerAction(IcebergActions.EXPIRE_DATA);
+    registerAction(IcebergActions.OPTIMIZING_MINOR);
+    registerAction(IcebergActions.OPTIMIZING_MAJOR);
+    registerAction(IcebergActions.OPTIMIZING_FULL);
+
+    // Register Paimon actions
+    registerAction(PaimonActions.COMPACT);
+    registerAction(PaimonActions.FULL_COMPACT);
+    registerAction(PaimonActions.CLEAN_METADATA);
+    registerAction(PaimonActions.DELETE_SNAPSHOTS);
+  }
+
+  /**
+   * Register an action in the registry.
+   *
+   * @param action the action to register
+   */
+  public static void registerAction(Action action) {
+    if (action != null && action.getName() != null) {
+      ACTION_REGISTRY.put(action.getName(), action);
+    }
+  }
+
+  /**
+   * Register a custom action. This is a convenience method that delegates to 
{@link
+   * #registerAction(Action)}.
+   *
+   * @param action the custom action to register
+   */
+  public static void registerCustomAction(Action action) {
+    registerAction(action);
+  }
+
+  /**
+   * Get an action by its name from the registry.
+   *
+   * @param name the action name to look up
+   * @return the registered action, or null if not found and name is null/empty
+   */
+  public static Action getActionByName(String name) {
+    if (name == null || name.isEmpty()) {
+      return null;
+    }
+    return ACTION_REGISTRY.get(name);
+  }
+
+  /**
+   * Get all registered actions.
+   *
+   * @return array of all registered actions
+   */
+  public static Action[] getRegisteredActions() {
+    return ACTION_REGISTRY.values().toArray(new Action[0]);
+  }
+
+  @Override
+  public void setParameter(PreparedStatement ps, int i, Action parameter, 
JdbcType jdbcType)
+      throws SQLException {
+    if (parameter == null) {
+      ps.setString(i, "");
+    } else {
+      ps.setString(i, parameter.getName());
+    }
+  }
+
+  @Override
+  public Action getResult(ResultSet rs, String columnName) throws SQLException 
{
+    String actionName = rs.getString(columnName);
+    return convertToAction(actionName);
+  }
+
+  @Override
+  public Action getResult(ResultSet rs, int columnIndex) throws SQLException {
+    String actionName = rs.getString(columnIndex);
+    return convertToAction(actionName);
+  }
+
+  @Override
+  public Action getResult(CallableStatement cs, int columnIndex) throws 
SQLException {
+    String actionName = cs.getString(columnIndex);
+    return convertToAction(actionName);
+  }
+
+  /**
+   * Convert a string action name to an Action object. First attempts to find 
the action in the
+   * registry. If not found, creates a temporary action with the given name 
for backward
+   * compatibility.
+   *
+   * @param actionName the action name to convert
+   * @return the corresponding Action object, or null if actionName is 
null/empty
+   */
+  private Action convertToAction(String actionName) {
+    if (actionName == null || actionName.isEmpty()) {
+      return null;
+    }
+
+    Action action = ACTION_REGISTRY.get(actionName);
+    if (action != null) {
+      return action;
+    }
+
+    LOG.warn(
+        "Unknown action name '{}', creating temporary action for backward 
compatibility",
+        actionName);
+    Action tempAction = new Action(DEFAULT_FORMATS, 0, actionName);
+    registerAction(tempAction);
+    return tempAction;
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
index f7b755781..e15f7f192 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
@@ -30,8 +30,21 @@ import org.apache.ibatis.annotations.Update;
 
 import java.util.Map;
 
+/**
+ * Mapper for table_process_state table.
+ *
+ * @deprecated This mapper is deprecated as of AMORO-3951. Use {@link 
TableProcessMapper} instead.
+ *     The table_process_state table has been merged into table_process.
+ */
+@Deprecated
 public interface ProcessStateMapper {
 
+  /**
+   * Create a new process state.
+   *
+   * @deprecated Use {@link TableProcessMapper#insertProcess} instead.
+   */
+  @Deprecated
   @Insert(
       "INSERT INTO table_process_state "
           + "(process_id, action, table_id, retry_num, status, start_time, 
end_time, fail_reason, summary) "
@@ -40,24 +53,48 @@ public interface ProcessStateMapper {
   @Options(useGeneratedKeys = true, keyProperty = "id")
   void createProcessState(TableProcessState state);
 
+  /**
+   * Update process state to running.
+   *
+   * @deprecated Use {@link TableProcessMapper#updateProcess} instead.
+   */
+  @Deprecated
   @Update(
       "UPDATE table_process_state "
           + "SET status = #{status}, start_time = #{startTime} "
           + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
   void updateProcessRunning(TableProcessState state);
 
+  /**
+   * Update process state to completed.
+   *
+   * @deprecated Use {@link TableProcessMapper#updateProcess} instead.
+   */
+  @Deprecated
   @Update(
       "UPDATE table_process_state "
           + "SET status = #{status}, end_time = #{endTime} "
           + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
   void updateProcessCompleted(TableProcessState state);
 
+  /**
+   * Update process state to failed.
+   *
+   * @deprecated Use {@link TableProcessMapper#updateProcess} instead.
+   */
+  @Deprecated
   @Update(
       "UPDATE table_process_state "
           + "SET status = #{status}, end_time = #{endTime}, fail_reason = 
#{failedReason} "
           + "WHERE process_id = #{id} and retry_num = #{retryNumber}")
   void updateProcessFailed(TableProcessState state);
 
+  /**
+   * Query TableProcessState by process_id.
+   *
+   * @deprecated Use {@link TableProcessMapper#getProcessMeta} instead.
+   */
+  @Deprecated
   @Select(
       "SELECT process_id, action, table_id, retry_num, status, start_time, 
end_time, fail_reason, summary "
           + "FROM table_process_state "
@@ -77,7 +114,12 @@ public interface ProcessStateMapper {
       })
   TableProcessState getProcessStateById(@Param("processId") long processId);
 
-  /** Query TableProcessState by table_id */
+  /**
+   * Query TableProcessState by table_id.
+   *
+   * @deprecated Use {@link TableProcessMapper#listProcessMeta} instead.
+   */
+  @Deprecated
   @Select(
       "SELECT process_id, action, table_id, retry_num, status, start_time, 
end_time, fail_reason, summary "
           + "FROM table_process_state "
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java
new file mode 100644
index 000000000..e8d660427
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.converter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.PaimonActions;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link Action2StringConverter}. */
+public class Action2StringConverterTest {
+
+  @Test
+  public void testGetRegisteredIcebergAction() {
+    Action action = Action2StringConverter.getActionByName("optimizing-minor");
+    assertNotNull(action, "Should find registered optimizing-minor action");
+    assertEquals("optimizing-minor", action.getName());
+    assertSame(IcebergActions.OPTIMIZING_MINOR, action, "Should return the 
same instance");
+  }
+
+  @Test
+  public void testGetRegisteredIcebergActions() {
+    assertEquals(IcebergActions.SYSTEM, 
Action2StringConverter.getActionByName("system"));
+    assertEquals(IcebergActions.REWRITE, 
Action2StringConverter.getActionByName("rewrite"));
+    assertEquals(
+        IcebergActions.DELETE_ORPHANS, 
Action2StringConverter.getActionByName("delete-orphans"));
+    assertEquals(IcebergActions.SYNC_HIVE, 
Action2StringConverter.getActionByName("sync-hive"));
+    assertEquals(IcebergActions.EXPIRE_DATA, 
Action2StringConverter.getActionByName("expire-data"));
+    assertEquals(
+        IcebergActions.OPTIMIZING_MINOR,
+        Action2StringConverter.getActionByName("optimizing-minor"));
+    assertEquals(
+        IcebergActions.OPTIMIZING_MAJOR,
+        Action2StringConverter.getActionByName("optimizing-major"));
+    assertEquals(
+        IcebergActions.OPTIMIZING_FULL, 
Action2StringConverter.getActionByName("optimizing-full"));
+  }
+
+  @Test
+  public void testGetRegisteredPaimonActions() {
+    assertEquals(PaimonActions.COMPACT, 
Action2StringConverter.getActionByName("compact"));
+    assertEquals(
+        PaimonActions.FULL_COMPACT, 
Action2StringConverter.getActionByName("full-compact"));
+    assertEquals(
+        PaimonActions.CLEAN_METADATA, 
Action2StringConverter.getActionByName("clean-meta"));
+    assertEquals(
+        PaimonActions.DELETE_SNAPSHOTS, 
Action2StringConverter.getActionByName("del-snapshots"));
+  }
+
+  @Test
+  public void testGetUnknownActionCreatesTemporary() {
+    String unknownActionName = "custom-optimizing-action";
+    Action action = Action2StringConverter.getActionByName(unknownActionName);
+
+    assertNotNull(action, "Should create action for unknown name");
+    assertEquals(unknownActionName, action.getName());
+  }
+
+  @Test
+  public void testGetUnknownActionReturnsSameInstance() {
+    String unknownActionName = "another-custom-action";
+    Action action1 = Action2StringConverter.getActionByName(unknownActionName);
+    Action action2 = Action2StringConverter.getActionByName(unknownActionName);
+
+    assertSame(action1, action2, "Should return the same instance for same 
unknown action name");
+  }
+
+  @Test
+  public void testGetNullAction() {
+    Action action = Action2StringConverter.getActionByName(null);
+    assertNull(action, "Should return null for null input");
+  }
+
+  @Test
+  public void testGetEmptyAction() {
+    Action action = Action2StringConverter.getActionByName("");
+    assertNull(action, "Should return null for empty string");
+  }
+
+  @Test
+  public void testRegisterCustomAction() {
+    Action customAction =
+        new Action(
+            new org.apache.amoro.TableFormat[] 
{org.apache.amoro.TableFormat.PAIMON},
+            50,
+            "custom-action");
+
+    Action2StringConverter.registerCustomAction(customAction);
+    Action retrieved = Action2StringConverter.getActionByName("custom-action");
+
+    assertSame(customAction, retrieved, "Should retrieve the same custom 
action");
+  }
+
+  @Test
+  public void testGetRegisteredActions() {
+    Action[] actions = Action2StringConverter.getRegisteredActions();
+    assertTrue(actions.length > 0, "Should have registered actions");
+
+    boolean hasOptimizingMinor = false;
+    for (Action action : actions) {
+      if ("optimizing-minor".equals(action.getName())) {
+        hasOptimizingMinor = true;
+        break;
+      }
+    }
+    assertTrue(hasOptimizingMinor, "Should include optimizing-minor action");
+  }
+}
diff --git a/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java 
b/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java
new file mode 100644
index 000000000..d15d05c66
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro;
+
+/** Pre-defined actions for Paimon table format operations. */
+public class PaimonActions {
+
+  private static final TableFormat[] PAIMON_FORMATS = new TableFormat[] 
{TableFormat.PAIMON};
+
+  /** Minor compaction action for Paimon tables. */
+  public static final Action COMPACT = new Action(PAIMON_FORMATS, 100, 
"compact");
+
+  /** Full compaction action for Paimon tables. */
+  public static final Action FULL_COMPACT = new Action(PAIMON_FORMATS, 200, 
"full-compact");
+
+  /** Clean metadata action for removing expired snapshots. */
+  public static final Action CLEAN_METADATA = new Action(PAIMON_FORMATS, 10, 
"clean-meta");
+
+  /** Delete snapshots action for Paimon tables. */
+  public static final Action DELETE_SNAPSHOTS = new Action(PAIMON_FORMATS, 5, 
"del-snapshots");
+
+  private PaimonActions() {
+    // Private constructor to prevent instantiation
+  }
+}

Reply via email to