This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new a454f4754 [AMORO-3875] Support to build on Spark Scala-2.13 (#3878)
a454f4754 is described below

commit a454f47541928e8a4fd524e18c33aa0c15ec5c21
Author: Fei Wang <[email protected]>
AuthorDate: Thu Nov 6 18:08:24 2025 -0800

    [AMORO-3875] Support to build on Spark Scala-2.13 (#3878)
    
    * scala binary version
    
    save
    
    bin
    
    profile
    
    GA
    
    revert paimon ams
    
    api compatiblity
    
    fix flink scala
    
    style
    
    save
    
    save
    
    rewrite by scala
    
    save
    
    conflicts
    
    dependency
    
    save
    
    save
    
    scala paimon
    
    ignore paimon
    
    idea
    
    * nit
    
    * nit
    
    * save
    
    * revert ci change
---
 amoro-ams/pom.xml                                  |   2 +-
 .../pom.xml                                        |   6 +-
 .../amoro-mixed-flink-common/pom.xml               |   6 +-
 .../amoro-mixed-spark-3-common/pom.xml             |   4 +-
 .../amoro/spark/test/utils/ScalaTestUtil.java      |   2 +-
 amoro-format-mixed/amoro-mixed-spark/pom.xml       |   2 +-
 .../v3.3/amoro-mixed-spark-3.3/pom.xml             |  43 ++++-
 .../amoro/spark/SparkInternalRowCastWrapper.java   | 210 ---------------------
 .../amoro/spark/SparkInternalRowCastWrapper.scala  | 141 ++++++++++++++
 .../RewriteMixedFormatMergeIntoTable.scala         |  32 ++--
 .../execution/ExtendedMixedFormatStrategy.scala    |   3 +-
 .../parser/MixedFormatSqlExtendAstBuilder.scala    |   2 +-
 .../v3.3/amoro-mixed-spark-runtime-3.3/pom.xml     |  16 +-
 .../v3.5/amoro-mixed-spark-3.5/pom.xml             |  43 ++++-
 .../amoro/spark/SparkInternalRowCastWrapper.java   | 210 ---------------------
 .../amoro/spark/SparkInternalRowCastWrapper.scala  | 141 ++++++++++++++
 .../RewriteMixedFormatMergeIntoTable.scala         |  32 ++--
 .../execution/ExtendedMixedFormatStrategy.scala    |   3 +-
 .../parser/MixedFormatSqlExtendAstBuilder.scala    |   3 +-
 .../v3.5/amoro-mixed-spark-runtime-3.5/pom.xml     |  16 +-
 amoro-optimizer/amoro-optimizer-flink/pom.xml      |   5 +-
 amoro-optimizer/amoro-optimizer-spark/pom.xml      |   2 +-
 dist/pom.xml                                       |   2 +-
 dist/src/main/assemblies/bin.xml                   |   2 +-
 pom.xml                                            |  33 ++++
 25 files changed, 465 insertions(+), 496 deletions(-)

diff --git a/amoro-ams/pom.xml b/amoro-ams/pom.xml
index 255102392..419e46362 100644
--- a/amoro-ams/pom.xml
+++ b/amoro-ams/pom.xml
@@ -332,7 +332,7 @@
         <!-- runtime dependencies -->
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            
<artifactId>amoro-format-mixed-spark-${spark.major.version}</artifactId>
+            
<artifactId>amoro-format-mixed-spark-${spark.major.version}_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>runtime</scope>
         </dependency>
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/pom.xml
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/pom.xml
index b6394c202..eb6ed9e03 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/pom.xml
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/pom.xml
@@ -111,7 +111,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+            
<artifactId>flink-hadoop-compatibility_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -170,7 +170,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -304,7 +304,7 @@
         <!--   for values test connector     -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
index 91f27a413..51f31421d 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
@@ -131,7 +131,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+            
<artifactId>flink-hadoop-compatibility_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -197,7 +197,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -365,7 +365,7 @@
         <!--   for values test connector     -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/pom.xml 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/pom.xml
index f8dca8b9e..ad5668c54 100644
--- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/pom.xml
@@ -21,12 +21,12 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.amoro</groupId>
-        <artifactId>amoro-mixed-spark</artifactId>
+        <artifactId>amoro-mixed-spark_${scala.binary.version}</artifactId>
         <version>0.9-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <artifactId>amoro-format-mixed-spark-3-common</artifactId>
+    
<artifactId>amoro-format-mixed-spark-3-common_${scala.binary.version}</artifactId>
     <packaging>jar</packaging>
     <name>Amoro Project Mixed Format Spark 3 Common</name>
     <url>https://amoro.apache.org</url>
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/utils/ScalaTestUtil.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/utils/ScalaTestUtil.java
index f9e44d594..725af2d84 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/utils/ScalaTestUtil.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/utils/ScalaTestUtil.java
@@ -26,6 +26,6 @@ import java.util.List;
 public class ScalaTestUtil {
 
   public static <T> Seq<T> seq(List<T> values) {
-    return JavaConverters.asScalaBufferConverter(values).asScala().seq();
+    return JavaConverters.asScalaBuffer(values).toSeq();
   }
 }
diff --git a/amoro-format-mixed/amoro-mixed-spark/pom.xml 
b/amoro-format-mixed/amoro-mixed-spark/pom.xml
index 3a63d6688..6c7b5de38 100644
--- a/amoro-format-mixed/amoro-mixed-spark/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-spark/pom.xml
@@ -25,7 +25,7 @@
         <version>0.9-SNAPSHOT</version>
     </parent>
 
-    <artifactId>amoro-mixed-spark</artifactId>
+    <artifactId>amoro-mixed-spark_${scala.binary.version}</artifactId>
     <packaging>pom</packaging>
     <name>Amoro Project Mixed Format Spark Parent</name>
     <url>https://amoro.apache.org</url>
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
index 74a924f1e..c721addc6 100644
--- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
@@ -21,12 +21,12 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.amoro</groupId>
-        <artifactId>amoro-mixed-spark</artifactId>
+        <artifactId>amoro-mixed-spark_${scala.binary.version}</artifactId>
         <version>0.9-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>amoro-format-mixed-spark-3.3</artifactId>
+    
<artifactId>amoro-format-mixed-spark-3.3_${scala.binary.version}</artifactId>
     <packaging>jar</packaging>
     <name>Amoro Project Mixed Format Spark 3.3</name>
     <url>https://amoro.apache.org</url>
@@ -229,7 +229,7 @@
 
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            <artifactId>amoro-format-mixed-spark-3-common</artifactId>
+            
<artifactId>amoro-format-mixed-spark-3-common_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -261,10 +261,24 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.paimon</groupId>
-            <artifactId>paimon-spark-3.3</artifactId>
+            <artifactId>paimon-spark-${spark.major.version}</artifactId>
             <version>${paimon.version}</version>
             <scope>test</scope>
         </dependency>
@@ -361,7 +375,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            <artifactId>amoro-format-mixed-spark-3-common</artifactId>
+            
<artifactId>amoro-format-mixed-spark-3-common_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
@@ -470,4 +484,23 @@
         <sourceDirectory>src/main/java</sourceDirectory>
     </build>
 
+    <profiles>
+        <profile>
+            <id>scala-2.13</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <excludes>
+                                <!-- TODO: remove it after paimon-spark on 
scala-2.13 released -->
+                                <exclude>**/TestUnifiedCatalog.java</exclude>
+                            </excludes>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkInternalRowCastWrapper.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkInternalRowCastWrapper.java
deleted file mode 100644
index f5a6aa478..000000000
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkInternalRowCastWrapper.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.spark;
-
-import org.apache.amoro.data.ChangeAction;
-import org.apache.amoro.spark.sql.utils.ProjectingInternalRow;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-import scala.collection.Seq;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** cast internal row to upsert internal row */
-public class SparkInternalRowCastWrapper extends GenericInternalRow {
-  private final InternalRow row;
-  private final StructType schema;
-  private ChangeAction changeAction = ChangeAction.INSERT;
-  private List<DataType> dataTypeList;
-
-  public SparkInternalRowCastWrapper(
-      InternalRow row, ChangeAction changeAction, StructType schema) {
-    this.row = row;
-    this.changeAction = changeAction;
-    if (row instanceof ProjectingInternalRow) {
-      this.schema = ((ProjectingInternalRow) row).schema();
-    } else {
-      this.schema = schema;
-    }
-  }
-
-  public StructType getSchema() {
-    return this.schema;
-  }
-
-  @Override
-  public Object genericGet(int ordinal) {
-    return row.get(ordinal, schema.apply(ordinal).dataType());
-  }
-
-  @Override
-  public Seq<Object> toSeq(Seq<DataType> fieldTypes) {
-    return super.toSeq(fieldTypes);
-  }
-
-  @Override
-  public int numFields() {
-    return schema.size() / 2;
-  }
-
-  @Override
-  public void setNullAt(int i) {
-    super.setNullAt(i);
-  }
-
-  @Override
-  public void update(int i, Object value) {
-    super.update(i, value);
-  }
-
-  @Override
-  public boolean isNullAt(int ordinal) {
-    dataTypeList =
-        
Arrays.stream(schema.fields()).map(StructField::dataType).collect(Collectors.toList());
-    return row.get(ordinal, dataTypeList.get(ordinal)) == null;
-  }
-
-  @Override
-  public Object get(int pos, DataType dt) {
-    return row.get(pos, dt);
-  }
-
-  @Override
-  public boolean getBoolean(int ordinal) {
-    return super.getBoolean(ordinal);
-  }
-
-  @Override
-  public byte getByte(int ordinal) {
-    return super.getByte(ordinal);
-  }
-
-  @Override
-  public short getShort(int ordinal) {
-    return super.getShort(ordinal);
-  }
-
-  @Override
-  public int getInt(int ordinal) {
-    return super.getInt(ordinal);
-  }
-
-  @Override
-  public long getLong(int ordinal) {
-    return super.getLong(ordinal);
-  }
-
-  @Override
-  public float getFloat(int ordinal) {
-    return super.getFloat(ordinal);
-  }
-
-  @Override
-  public double getDouble(int ordinal) {
-    return super.getDouble(ordinal);
-  }
-
-  @Override
-  public Decimal getDecimal(int ordinal, int precision, int scale) {
-    return super.getDecimal(ordinal, precision, scale);
-  }
-
-  @Override
-  public UTF8String getUTF8String(int ordinal) {
-    return super.getUTF8String(ordinal);
-  }
-
-  @Override
-  public byte[] getBinary(int ordinal) {
-    return super.getBinary(ordinal);
-  }
-
-  @Override
-  public ArrayData getArray(int ordinal) {
-    return super.getArray(ordinal);
-  }
-
-  @Override
-  public CalendarInterval getInterval(int ordinal) {
-    return super.getInterval(ordinal);
-  }
-
-  @Override
-  public MapData getMap(int ordinal) {
-    return super.getMap(ordinal);
-  }
-
-  @Override
-  public InternalRow getStruct(int ordinal, int numFields) {
-    return super.getStruct(ordinal, numFields);
-  }
-
-  public InternalRow getRow() {
-    return this.row;
-  }
-
-  public ChangeAction getChangeAction() {
-    return changeAction;
-  }
-
-  @Override
-  public String toString() {
-    return super.toString();
-  }
-
-  @Override
-  public GenericInternalRow copy() {
-    return super.copy();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return super.equals(o);
-  }
-
-  @Override
-  public int hashCode() {
-    return super.hashCode();
-  }
-
-  @Override
-  public Object[] values() {
-    return super.values();
-  }
-
-  public InternalRow setFileOffset(Long fileOffset) {
-    List<DataType> dataTypeList =
-        
Arrays.stream(schema.fields()).map(StructField::dataType).collect(Collectors.toList());
-    List<Object> objectSeq = new ArrayList<>(dataTypeList.size() + 1);
-    row.toSeq(schema).toStream().foreach(objectSeq::add);
-    objectSeq.add(fileOffset);
-    return new GenericInternalRow(objectSeq.toArray());
-  }
-}
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/SparkInternalRowCastWrapper.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/SparkInternalRowCastWrapper.scala
new file mode 100644
index 000000000..c0af4efba
--- /dev/null
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/SparkInternalRowCastWrapper.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+import org.apache.amoro.data.ChangeAction
+import org.apache.amoro.spark.sql.utils.ProjectingInternalRow
+
+/** cast internal row to upsert internal row */
+class SparkInternalRowCastWrapper(
+    private val row: InternalRow,
+    private val changeAction: ChangeAction,
+    private var schema: StructType) extends GenericInternalRow {
+  if (row.isInstanceOf[ProjectingInternalRow]) {
+    schema = row.asInstanceOf[ProjectingInternalRow].schema
+  }
+  private lazy val dataTypeList = schema.fields.map(_.dataType)
+
+  def getSchema: StructType = schema
+
+  override protected def genericGet(ordinal: Int): Any = {
+    row.get(ordinal, dataTypeList(ordinal))
+  }
+
+  override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = {
+    super.toSeq(fieldTypes)
+  }
+
+  override def numFields: Int = {
+    schema.size / 2
+  }
+
+  override def setNullAt(i: Int): Unit = {
+    super.setNullAt(i)
+  }
+
+  override def update(i: Int, value: Any): Unit = {
+    super.update(i, value)
+  }
+
+  override def isNullAt(ordinal: Int): Boolean = {
+    row.get(ordinal, dataTypeList(ordinal)) == null
+  }
+
+  override def get(pos: Int, dt: DataType): AnyRef = {
+    row.get(pos, dt)
+  }
+
+  override def getBoolean(ordinal: Int): Boolean = {
+    super.getBoolean(ordinal)
+  }
+
+  override def getByte(ordinal: Int): Byte = {
+    super.getByte(ordinal)
+  }
+
+  override def getShort(ordinal: Int): Short = {
+    super.getShort(ordinal)
+  }
+
+  override def getInt(ordinal: Int): Int = {
+    super.getInt(ordinal)
+  }
+
+  override def getLong(ordinal: Int): Long = {
+    super.getLong(ordinal)
+  }
+
+  override def getFloat(ordinal: Int): Float = {
+    super.getFloat(ordinal)
+  }
+
+  override def getDouble(ordinal: Int): Double = {
+    super.getDouble(ordinal)
+  }
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = 
{
+    super.getDecimal(ordinal, precision, scale)
+  }
+
+  override def getUTF8String(ordinal: Int): UTF8String = {
+    super.getUTF8String(ordinal)
+  }
+
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    super.getBinary(ordinal)
+  }
+
+  override def getArray(ordinal: Int): ArrayData = {
+    super.getArray(ordinal)
+  }
+
+  override def getInterval(ordinal: Int): CalendarInterval = {
+    super.getInterval(ordinal)
+  }
+
+  override def getMap(ordinal: Int): MapData = {
+    super.getMap(ordinal)
+  }
+
+  override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
+    super.getStruct(ordinal, numFields)
+  }
+
+  def getRow: InternalRow = this.row
+
+  def getChangeAction: ChangeAction = changeAction
+
+  override def toString: String = super.toString
+
+  override def copy: GenericInternalRow = super.copy
+
+  override def equals(o: Any): Boolean = super.equals(o)
+
+  override def hashCode: Int = super.hashCode
+
+  def setFileOffset(fileOffset: Long): InternalRow = {
+    new GenericInternalRow((row.toSeq(schema) ++ Seq(fileOffset)).toArray)
+  }
+}
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
index 6268a0c4d..960482a94 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
@@ -83,7 +83,7 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
             cond.references.filter(p => primarys.contains(p.name)).toSeq
           }
           val attrs = dedupAttrs(relation.output)
-          (keyAttrs, relation.copy(table = operationTable, output = attrs))
+          (keyAttrs, relation.copy(table = operationTable, output = 
attrs.toSeq))
         } else {
           val (keyAttrs, valuesRelation) = {
             if (mixedSparkTable.requireAdditionIdentifierColumns()) {
@@ -91,7 +91,7 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
               scanBuilder.withIdentifierColumns()
               val scan = scanBuilder.build()
               val outputAttr = toOutputAttrs(scan.readSchema(), 
relation.output)
-              val valuesRelation = DataSourceV2ScanRelation(relation, scan, 
outputAttr)
+              val valuesRelation = DataSourceV2ScanRelation(relation, scan, 
outputAttr.toSeq)
               val references = cond.references.toSeq
               (references, valuesRelation)
             } else {
@@ -129,11 +129,15 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
       isKeyedTable: Boolean): WriteQueryProjections = {
     val (frontRowProjection, backRowProjection) = if (isKeyedTable) {
       val frontRowProjection =
-        Some(ProjectingInternalRow.newProjectInternalRow(plan, targetRowAttrs, 
isFront = true, 0))
+        Some(ProjectingInternalRow.newProjectInternalRow(
+          plan,
+          targetRowAttrs.toSeq,
+          isFront = true,
+          0))
       val backRowProjection =
         ProjectingInternalRow.newProjectInternalRow(
           plan,
-          targetRowAttrs,
+          targetRowAttrs.toSeq,
           isFront = false,
           rowIdAttrs.size)
       (frontRowProjection, backRowProjection)
@@ -141,13 +145,13 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
       val frontRowProjection =
         Some(ProjectingInternalRow.newProjectInternalRow(
           plan,
-          targetRowAttrs ++ rowIdAttrs,
+          (targetRowAttrs ++ rowIdAttrs).toSeq,
           isFront = true,
           0))
       val backRowProjection =
         ProjectingInternalRow.newProjectInternalRow(
           source,
-          targetRowAttrs,
+          targetRowAttrs.toSeq,
           isFront = false,
           1 + rowIdAttrs.size)
       (frontRowProjection, backRowProjection)
@@ -192,11 +196,11 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
 
     val matchedConditions = matchedActions.map(actionCondition)
     val matchedOutputs =
-      matchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output))
+      matchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output).toSeq)
 
     val notMatchedConditions = notMatchedActions.map(actionCondition)
     val notMatchedOutputs =
-      notMatchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output))
+      notMatchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output).toSeq)
 
     val operationTypeAttr = AttributeReference(OPERATION_COLUMN, IntegerType, 
nullable = false)()
     val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE_REF, joinPlan)
@@ -215,15 +219,15 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
       isSourceRowPresent = IsNotNull(rowFromSourceAttr),
       isTargetRowPresent =
         if (notMatchedActions.isEmpty) TrueLiteral else 
IsNotNull(rowFromTargetAttr),
-      matchedConditions = matchedConditions,
-      matchedOutputs = matchedOutputs,
-      notMatchedConditions = notMatchedConditions,
-      notMatchedOutputs = notMatchedOutputs,
-      rowIdAttrs = keyAttrs,
+      matchedConditions = matchedConditions.toSeq,
+      matchedOutputs = matchedOutputs.toSeq,
+      notMatchedConditions = notMatchedConditions.toSeq,
+      notMatchedOutputs = notMatchedOutputs.toSeq,
+      rowIdAttrs = keyAttrs.toSeq,
       matchedRowCheck = isMatchedRowCheckNeeded(matchedActions),
       unMatchedRowCheck = unMatchedRowNeedCheck,
       emitNotMatchedTargetRows = false,
-      output = mergeRowsOutput,
+      output = mergeRowsOutput.toSeq,
       joinPlan)
 
     // build a plan to write the row delta to the table
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
index 1d3d1d33f..e0deb2eea 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import 
org.apache.amoro.spark.sql.MixedFormatExtensionUtils.{isMixedFormatTable, 
MixedFormatTableHelper}
 import org.apache.amoro.spark.sql.catalyst.plans._
+import org.apache.amoro.spark.sql.execution.{MergeRowsExec => 
AmoroMergeRowsExec}
 
 case class ExtendedMixedFormatStrategy(spark: SparkSession) extends Strategy 
with PredicateHelper {
 
@@ -71,7 +72,7 @@ case class ExtendedMixedFormatStrategy(spark: SparkSession) 
extends Strategy wit
           emitNotMatchedTargetRows,
           output,
           child) =>
-      MergeRowsExec(
+      AmoroMergeRowsExec(
         isSourceRowPresent,
         isTargetRowPresent,
         matchedConditions,
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
index f22c36301..121431b3a 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
@@ -1240,7 +1240,7 @@ class MixedFormatSqlExtendAstBuilder()
   }
 
   override def visitIdentifierSeq(ctx: IdentifierSeqContext): Seq[String] = 
withOrigin(ctx) {
-    ctx.ident.asScala.map(_.getText)
+    ctx.ident.asScala.map(_.getText).toSeq
   }
 
   /**
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml
index eeab2049c..5adcab3de 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml
@@ -21,12 +21,12 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.amoro</groupId>
-        <artifactId>amoro-mixed-spark</artifactId>
+        <artifactId>amoro-mixed-spark_${scala.binary.version}</artifactId>
         <version>0.9-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>amoro-format-mixed-spark-runtime-3.3</artifactId>
+    
<artifactId>amoro-format-mixed-spark-runtime-3.3_${scala.binary.version}</artifactId>
     <packaging>jar</packaging>
     <name>Amoro Project Mixed Format Spark 3.3 Runtime</name>
     <url>https://amoro.apache.org</url>
@@ -34,7 +34,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            <artifactId>amoro-format-mixed-spark-3.3</artifactId>
+            
<artifactId>amoro-format-mixed-spark-3.3_${scala.binary.version}</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
     </dependencies>
@@ -56,8 +56,8 @@
                             
<createDependencyReducedPom>false</createDependencyReducedPom>
                             <artifactSet>
                                 <includes>
-                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3-common</include>
-                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3.3</include>
+                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3-common_${scala.binary.version}</include>
+                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3.3_${scala.binary.version}</include>
                                     
<include>org.apache.amoro:amoro-common</include>
                                     
<include>org.apache.amoro:amoro-format-iceberg</include>
                                     
<include>org.apache.amoro:amoro-mixed-hive</include>
@@ -72,7 +72,7 @@
                                     
<include>org.apache.zookeeper:zookeeper-jute</include>
                                     
<include>org.apache.iceberg:iceberg-core</include>
                                     
<include>org.apache.iceberg:iceberg-api</include>
-                                    
<include>org.apache.iceberg:iceberg-spark-3.3_2.12</include>
+                                    
<include>org.apache.iceberg:iceberg-spark-3.3_${scala.binary.version}</include>
                                     
<include>org.apache.iceberg:iceberg-common</include>
                                     
<include>org.apache.iceberg:iceberg-data</include>
                                     
<include>org.apache.iceberg:iceberg-orc</include>
@@ -89,7 +89,7 @@
                                     
<include>org.apache.iceberg:iceberg-arrow</include>
                                     
<include>org.apache.iceberg:iceberg-hive-metastore</include>
                                     
<include>org.apache.iceberg:iceberg-spark</include>
-                                    
<include>org.apache.iceberg:iceberg-spark-extensions-3.3_2.12</include>
+                                    
<include>org.apache.iceberg:iceberg-spark-extensions-3.3_${scala.binary.version}</include>
                                     
<include>org.apache.iceberg:iceberg-bundled-guava</include>
                                     <include>org.apache.orc:*</include>
                                     
<include>org.apache.thrift:libthrift</include>
@@ -116,7 +116,7 @@
                                     </excludes>
                                 </filter>
                                 <filter combine.children="append">
-                                    
<artifact>org.apache.iceberg:iceberg-spark-3.3_2.12</artifact>
+                                    
<artifact>org.apache.iceberg:iceberg-spark-3.3_${scala.binary.version}</artifact>
                                     <excludes>
                                         
<exclude>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</exclude>
                                     </excludes>
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/pom.xml 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/pom.xml
index 04a23f18d..005ead28d 100644
--- a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/pom.xml
@@ -21,12 +21,12 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.amoro</groupId>
-        <artifactId>amoro-mixed-spark</artifactId>
+        <artifactId>amoro-mixed-spark_${scala.binary.version}</artifactId>
         <version>0.9-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>amoro-format-mixed-spark-3.5</artifactId>
+    
<artifactId>amoro-format-mixed-spark-3.5_${scala.binary.version}</artifactId>
     <packaging>jar</packaging>
     <name>Amoro Project Mixed Format Spark 3.5</name>
     <url>https://amoro.apache.org</url>
@@ -233,7 +233,7 @@
 
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            <artifactId>amoro-format-mixed-spark-3-common</artifactId>
+            
<artifactId>amoro-format-mixed-spark-3-common_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -265,10 +265,24 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.paimon</groupId>
-            <artifactId>paimon-spark-3.5</artifactId>
+            <artifactId>paimon-spark-${spark.major.version}</artifactId>
             <version>${paimon.version}</version>
             <scope>test</scope>
         </dependency>
@@ -365,7 +379,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            <artifactId>amoro-format-mixed-spark-3-common</artifactId>
+            
<artifactId>amoro-format-mixed-spark-3-common_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
@@ -474,4 +488,23 @@
         <sourceDirectory>src/main/java</sourceDirectory>
     </build>
 
+    <profiles>
+        <profile>
+            <id>scala-2.13</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <excludes>
+                                <!-- TODO: remove it after paimon-spark on 
scala-2.13 released -->
+                                <exclude>**/TestUnifiedCatalog.java</exclude>
+                            </excludes>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/SparkInternalRowCastWrapper.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/SparkInternalRowCastWrapper.java
deleted file mode 100644
index f5a6aa478..000000000
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/SparkInternalRowCastWrapper.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.spark;
-
-import org.apache.amoro.data.ChangeAction;
-import org.apache.amoro.spark.sql.utils.ProjectingInternalRow;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-import scala.collection.Seq;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** cast internal row to upsert internal row */
-public class SparkInternalRowCastWrapper extends GenericInternalRow {
-  private final InternalRow row;
-  private final StructType schema;
-  private ChangeAction changeAction = ChangeAction.INSERT;
-  private List<DataType> dataTypeList;
-
-  public SparkInternalRowCastWrapper(
-      InternalRow row, ChangeAction changeAction, StructType schema) {
-    this.row = row;
-    this.changeAction = changeAction;
-    if (row instanceof ProjectingInternalRow) {
-      this.schema = ((ProjectingInternalRow) row).schema();
-    } else {
-      this.schema = schema;
-    }
-  }
-
-  public StructType getSchema() {
-    return this.schema;
-  }
-
-  @Override
-  public Object genericGet(int ordinal) {
-    return row.get(ordinal, schema.apply(ordinal).dataType());
-  }
-
-  @Override
-  public Seq<Object> toSeq(Seq<DataType> fieldTypes) {
-    return super.toSeq(fieldTypes);
-  }
-
-  @Override
-  public int numFields() {
-    return schema.size() / 2;
-  }
-
-  @Override
-  public void setNullAt(int i) {
-    super.setNullAt(i);
-  }
-
-  @Override
-  public void update(int i, Object value) {
-    super.update(i, value);
-  }
-
-  @Override
-  public boolean isNullAt(int ordinal) {
-    dataTypeList =
-        
Arrays.stream(schema.fields()).map(StructField::dataType).collect(Collectors.toList());
-    return row.get(ordinal, dataTypeList.get(ordinal)) == null;
-  }
-
-  @Override
-  public Object get(int pos, DataType dt) {
-    return row.get(pos, dt);
-  }
-
-  @Override
-  public boolean getBoolean(int ordinal) {
-    return super.getBoolean(ordinal);
-  }
-
-  @Override
-  public byte getByte(int ordinal) {
-    return super.getByte(ordinal);
-  }
-
-  @Override
-  public short getShort(int ordinal) {
-    return super.getShort(ordinal);
-  }
-
-  @Override
-  public int getInt(int ordinal) {
-    return super.getInt(ordinal);
-  }
-
-  @Override
-  public long getLong(int ordinal) {
-    return super.getLong(ordinal);
-  }
-
-  @Override
-  public float getFloat(int ordinal) {
-    return super.getFloat(ordinal);
-  }
-
-  @Override
-  public double getDouble(int ordinal) {
-    return super.getDouble(ordinal);
-  }
-
-  @Override
-  public Decimal getDecimal(int ordinal, int precision, int scale) {
-    return super.getDecimal(ordinal, precision, scale);
-  }
-
-  @Override
-  public UTF8String getUTF8String(int ordinal) {
-    return super.getUTF8String(ordinal);
-  }
-
-  @Override
-  public byte[] getBinary(int ordinal) {
-    return super.getBinary(ordinal);
-  }
-
-  @Override
-  public ArrayData getArray(int ordinal) {
-    return super.getArray(ordinal);
-  }
-
-  @Override
-  public CalendarInterval getInterval(int ordinal) {
-    return super.getInterval(ordinal);
-  }
-
-  @Override
-  public MapData getMap(int ordinal) {
-    return super.getMap(ordinal);
-  }
-
-  @Override
-  public InternalRow getStruct(int ordinal, int numFields) {
-    return super.getStruct(ordinal, numFields);
-  }
-
-  public InternalRow getRow() {
-    return this.row;
-  }
-
-  public ChangeAction getChangeAction() {
-    return changeAction;
-  }
-
-  @Override
-  public String toString() {
-    return super.toString();
-  }
-
-  @Override
-  public GenericInternalRow copy() {
-    return super.copy();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return super.equals(o);
-  }
-
-  @Override
-  public int hashCode() {
-    return super.hashCode();
-  }
-
-  @Override
-  public Object[] values() {
-    return super.values();
-  }
-
-  public InternalRow setFileOffset(Long fileOffset) {
-    List<DataType> dataTypeList =
-        
Arrays.stream(schema.fields()).map(StructField::dataType).collect(Collectors.toList());
-    List<Object> objectSeq = new ArrayList<>(dataTypeList.size() + 1);
-    row.toSeq(schema).toStream().foreach(objectSeq::add);
-    objectSeq.add(fileOffset);
-    return new GenericInternalRow(objectSeq.toArray());
-  }
-}
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/SparkInternalRowCastWrapper.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/SparkInternalRowCastWrapper.scala
new file mode 100644
index 000000000..c0af4efba
--- /dev/null
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/SparkInternalRowCastWrapper.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+import org.apache.amoro.data.ChangeAction
+import org.apache.amoro.spark.sql.utils.ProjectingInternalRow
+
+/** cast internal row to upsert internal row */
+class SparkInternalRowCastWrapper(
+    private val row: InternalRow,
+    private val changeAction: ChangeAction,
+    private var schema: StructType) extends GenericInternalRow {
+  if (row.isInstanceOf[ProjectingInternalRow]) {
+    schema = row.asInstanceOf[ProjectingInternalRow].schema
+  }
+  private lazy val dataTypeList = schema.fields.map(_.dataType)
+
+  def getSchema: StructType = schema
+
+  override protected def genericGet(ordinal: Int): Any = {
+    row.get(ordinal, dataTypeList(ordinal))
+  }
+
+  override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = {
+    super.toSeq(fieldTypes)
+  }
+
+  override def numFields: Int = {
+    schema.size / 2
+  }
+
+  override def setNullAt(i: Int): Unit = {
+    super.setNullAt(i)
+  }
+
+  override def update(i: Int, value: Any): Unit = {
+    super.update(i, value)
+  }
+
+  override def isNullAt(ordinal: Int): Boolean = {
+    row.get(ordinal, dataTypeList(ordinal)) == null
+  }
+
+  override def get(pos: Int, dt: DataType): AnyRef = {
+    row.get(pos, dt)
+  }
+
+  override def getBoolean(ordinal: Int): Boolean = {
+    super.getBoolean(ordinal)
+  }
+
+  override def getByte(ordinal: Int): Byte = {
+    super.getByte(ordinal)
+  }
+
+  override def getShort(ordinal: Int): Short = {
+    super.getShort(ordinal)
+  }
+
+  override def getInt(ordinal: Int): Int = {
+    super.getInt(ordinal)
+  }
+
+  override def getLong(ordinal: Int): Long = {
+    super.getLong(ordinal)
+  }
+
+  override def getFloat(ordinal: Int): Float = {
+    super.getFloat(ordinal)
+  }
+
+  override def getDouble(ordinal: Int): Double = {
+    super.getDouble(ordinal)
+  }
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = 
{
+    super.getDecimal(ordinal, precision, scale)
+  }
+
+  override def getUTF8String(ordinal: Int): UTF8String = {
+    super.getUTF8String(ordinal)
+  }
+
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    super.getBinary(ordinal)
+  }
+
+  override def getArray(ordinal: Int): ArrayData = {
+    super.getArray(ordinal)
+  }
+
+  override def getInterval(ordinal: Int): CalendarInterval = {
+    super.getInterval(ordinal)
+  }
+
+  override def getMap(ordinal: Int): MapData = {
+    super.getMap(ordinal)
+  }
+
+  override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
+    super.getStruct(ordinal, numFields)
+  }
+
+  def getRow: InternalRow = this.row
+
+  def getChangeAction: ChangeAction = changeAction
+
+  override def toString: String = super.toString
+
+  override def copy: GenericInternalRow = super.copy
+
+  override def equals(o: Any): Boolean = super.equals(o)
+
+  override def hashCode: Int = super.hashCode
+
+  def setFileOffset(fileOffset: Long): InternalRow = {
+    new GenericInternalRow((row.toSeq(schema) ++ Seq(fileOffset)).toArray)
+  }
+}
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
index d35a3c81c..ed33f0097 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatMergeIntoTable.scala
@@ -83,7 +83,7 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
             cond.references.filter(p => primarys.contains(p.name)).toSeq
           }
           val attrs = dedupAttrs(relation.output)
-          (keyAttrs, relation.copy(table = operationTable, output = attrs))
+          (keyAttrs, relation.copy(table = operationTable, output = 
attrs.toSeq))
         } else {
           val (keyAttrs, valuesRelation) = {
             if (mixedSparkTable.requireAdditionIdentifierColumns()) {
@@ -91,7 +91,7 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
               scanBuilder.withIdentifierColumns()
               val scan = scanBuilder.build()
               val outputAttr = toOutputAttrs(scan.readSchema(), 
relation.output)
-              val valuesRelation = DataSourceV2ScanRelation(relation, scan, 
outputAttr)
+              val valuesRelation = DataSourceV2ScanRelation(relation, scan, 
outputAttr.toSeq)
               val references = cond.references.toSeq
               (references, valuesRelation)
             } else {
@@ -129,11 +129,15 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
       isKeyedTable: Boolean): WriteQueryProjections = {
     val (frontRowProjection, backRowProjection) = if (isKeyedTable) {
       val frontRowProjection =
-        Some(ProjectingInternalRow.newProjectInternalRow(plan, targetRowAttrs, 
isFront = true, 0))
+        Some(ProjectingInternalRow.newProjectInternalRow(
+          plan,
+          targetRowAttrs.toSeq,
+          isFront = true,
+          0))
       val backRowProjection =
         ProjectingInternalRow.newProjectInternalRow(
           source,
-          targetRowAttrs,
+          targetRowAttrs.toSeq,
           isFront = false,
           1 + rowIdAttrs.size)
       (frontRowProjection, backRowProjection)
@@ -141,13 +145,13 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
       val frontRowProjection =
         Some(ProjectingInternalRow.newProjectInternalRow(
           plan,
-          targetRowAttrs ++ rowIdAttrs,
+          (targetRowAttrs ++ rowIdAttrs).toSeq,
           isFront = true,
           0))
       val backRowProjection =
         ProjectingInternalRow.newProjectInternalRow(
           source,
-          targetRowAttrs,
+          targetRowAttrs.toSeq,
           isFront = false,
           1 + rowIdAttrs.size)
       (frontRowProjection, backRowProjection)
@@ -191,11 +195,11 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
 
     val matchedConditions = matchedActions.map(actionCondition)
     val matchedOutputs =
-      matchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output))
+      matchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output).toSeq)
 
     val notMatchedConditions = notMatchedActions.map(actionCondition)
     val notMatchedOutputs =
-      notMatchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output))
+      notMatchedActions.map(rowLevelWriteOutput(_, readRelation.output, 
source.output).toSeq)
 
     val operationTypeAttr = AttributeReference(OPERATION_COLUMN, IntegerType, 
nullable = false)()
     val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE_REF, joinPlan)
@@ -214,15 +218,15 @@ case class RewriteMixedFormatMergeIntoTable(spark: 
SparkSession) extends Rule[Lo
       isSourceRowPresent = IsNotNull(rowFromSourceAttr),
       isTargetRowPresent =
         if (notMatchedActions.isEmpty) TrueLiteral else 
IsNotNull(rowFromTargetAttr),
-      matchedConditions = matchedConditions,
-      matchedOutputs = matchedOutputs,
-      notMatchedConditions = notMatchedConditions,
-      notMatchedOutputs = notMatchedOutputs,
-      rowIdAttrs = keyAttrs,
+      matchedConditions = matchedConditions.toSeq,
+      matchedOutputs = matchedOutputs.toSeq,
+      notMatchedConditions = notMatchedConditions.toSeq,
+      notMatchedOutputs = notMatchedOutputs.toSeq,
+      rowIdAttrs = keyAttrs.toSeq,
       matchedRowCheck = isMatchedRowCheckNeeded(matchedActions),
       unMatchedRowCheck = unMatchedRowNeedCheck,
       emitNotMatchedTargetRows = false,
-      output = mergeRowsOutput,
+      output = mergeRowsOutput.toSeq,
       joinPlan)
 
     // build a plan to write the row delta to the table
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
index 1d3d1d33f..e0deb2eea 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/execution/ExtendedMixedFormatStrategy.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import 
org.apache.amoro.spark.sql.MixedFormatExtensionUtils.{isMixedFormatTable, 
MixedFormatTableHelper}
 import org.apache.amoro.spark.sql.catalyst.plans._
+import org.apache.amoro.spark.sql.execution.{MergeRowsExec => 
AmoroMergeRowsExec}
 
 case class ExtendedMixedFormatStrategy(spark: SparkSession) extends Strategy 
with PredicateHelper {
 
@@ -71,7 +72,7 @@ case class ExtendedMixedFormatStrategy(spark: SparkSession) 
extends Strategy wit
           emitNotMatchedTargetRows,
           output,
           child) =>
-      MergeRowsExec(
+      AmoroMergeRowsExec(
         isSourceRowPresent,
         isTargetRowPresent,
         matchedConditions,
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
index a0b79ba2e..24f2c74a4 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/spark/sql/amoro/parser/MixedFormatSqlExtendAstBuilder.scala
@@ -43,7 +43,6 @@ import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, 
DateTimeUtils, Inte
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
-import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -1250,7 +1249,7 @@ class MixedFormatSqlExtendAstBuilder()
   }
 
   override def visitIdentifierSeq(ctx: IdentifierSeqContext): Seq[String] = 
withOrigin(ctx) {
-    ctx.ident.asScala.map(_.getText)
+    ctx.ident.asScala.map(_.getText).toSeq
   }
 
   /**
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-runtime-3.5/pom.xml
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-runtime-3.5/pom.xml
index 6fb1ac536..0049e99a4 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-runtime-3.5/pom.xml
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-runtime-3.5/pom.xml
@@ -21,12 +21,12 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.amoro</groupId>
-        <artifactId>amoro-mixed-spark</artifactId>
+        <artifactId>amoro-mixed-spark_${scala.binary.version}</artifactId>
         <version>0.9-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>amoro-format-mixed-spark-runtime-3.5</artifactId>
+    
<artifactId>amoro-format-mixed-spark-runtime-3.5_${scala.binary.version}</artifactId>
     <packaging>jar</packaging>
     <name>Amoro Project Mixed Format Spark 3.5 Runtime</name>
     <url>https://amoro.apache.org</url>
@@ -34,7 +34,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            <artifactId>amoro-format-mixed-spark-3.5</artifactId>
+            
<artifactId>amoro-format-mixed-spark-3.5_${scala.binary.version}</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
     </dependencies>
@@ -56,8 +56,8 @@
                             
<createDependencyReducedPom>false</createDependencyReducedPom>
                             <artifactSet>
                                 <includes>
-                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3-common</include>
-                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3.5</include>
+                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3-common_${scala.binary.version}</include>
+                                    
<include>org.apache.amoro:amoro-format-mixed-spark-3.5_${scala.binary.version}</include>
                                     
<include>org.apache.amoro:amoro-common</include>
                                     
<include>org.apache.amoro:amoro-format-iceberg</include>
                                     
<include>org.apache.amoro:amoro-mixed-hive</include>
@@ -72,7 +72,7 @@
                                     
<include>org.apache.zookeeper:zookeeper-jute</include>
                                     
<include>org.apache.iceberg:iceberg-core</include>
                                     
<include>org.apache.iceberg:iceberg-api</include>
-                                    
<include>org.apache.iceberg:iceberg-spark-3.5_2.12</include>
+                                    
<include>org.apache.iceberg:iceberg-spark-3.5_${scala.binary.version}</include>
                                     
<include>org.apache.iceberg:iceberg-common</include>
                                     
<include>org.apache.iceberg:iceberg-data</include>
                                     
<include>org.apache.iceberg:iceberg-orc</include>
@@ -88,7 +88,7 @@
                                     
<include>org.apache.iceberg:iceberg-arrow</include>
                                     
<include>org.apache.iceberg:iceberg-hive-metastore</include>
                                     
<include>org.apache.iceberg:iceberg-spark</include>
-                                    
<include>org.apache.iceberg:iceberg-spark-extensions-3.5_2.12</include>
+                                    
<include>org.apache.iceberg:iceberg-spark-extensions-3.5_${scala.binary.version}</include>
                                     
<include>org.apache.iceberg:iceberg-bundled-guava</include>
                                     <include>org.apache.orc:*</include>
                                     
<include>org.apache.thrift:libthrift</include>
@@ -115,7 +115,7 @@
                                     </excludes>
                                 </filter>
                                 <filter combine.children="append">
-                                    
<artifact>org.apache.iceberg:iceberg-spark-3.5_2.12</artifact>
+                                    
<artifact>org.apache.iceberg:iceberg-spark-3.5_${scala.binary.version}</artifact>
                                     <excludes>
                                         
<exclude>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</exclude>
                                     </excludes>
diff --git a/amoro-optimizer/amoro-optimizer-flink/pom.xml 
b/amoro-optimizer/amoro-optimizer-flink/pom.xml
index 4104d44bb..8ce81805d 100644
--- a/amoro-optimizer/amoro-optimizer-flink/pom.xml
+++ b/amoro-optimizer/amoro-optimizer-flink/pom.xml
@@ -32,7 +32,6 @@
 
     <properties>
         <flink-optimizer.flink-version>1.18.1</flink-optimizer.flink-version>
-        <flink-optimizer.scala-version>2.12</flink-optimizer.scala-version>
         <!--   When the flink version is more than 1.15, the scala suffix is 
empty     -->
         
<flink-optimizer.scala-version_suffix></flink-optimizer.scala-version_suffix>
     </properties>
@@ -115,7 +114,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${flink-optimizer.scala-version}</artifactId>
+            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink-optimizer.flink-version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -222,7 +221,7 @@
                 </property>
             </activation>
             <properties>
-                
<flink-optimizer.scala-version_suffix>_${flink-optimizer.scala-version}</flink-optimizer.scala-version_suffix>
+                
<flink-optimizer.scala-version_suffix>_${flink.scala.binary.version}</flink-optimizer.scala-version_suffix>
             </properties>
         </profile>
     </profiles>
diff --git a/amoro-optimizer/amoro-optimizer-spark/pom.xml 
b/amoro-optimizer/amoro-optimizer-spark/pom.xml
index b26569724..69c4c1ebe 100644
--- a/amoro-optimizer/amoro-optimizer-spark/pom.xml
+++ b/amoro-optimizer/amoro-optimizer-spark/pom.xml
@@ -26,7 +26,7 @@
         <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <artifactId>amoro-optimizer-spark</artifactId>
+    
<artifactId>amoro-optimizer-spark-${spark.major.version}_${scala.binary.version}</artifactId>
     <name>Amoro Project AMS Spark Optimizer</name>
     <url>https://amoro.apache.org</url>
 
diff --git a/dist/pom.xml b/dist/pom.xml
index 65640f6f1..48606a9a0 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -52,7 +52,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.amoro</groupId>
-            <artifactId>amoro-optimizer-spark</artifactId>
+            
<artifactId>amoro-optimizer-spark-${spark.major.version}_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
diff --git a/dist/src/main/assemblies/bin.xml b/dist/src/main/assemblies/bin.xml
index fbad1a0ad..e23936512 100644
--- a/dist/src/main/assemblies/bin.xml
+++ b/dist/src/main/assemblies/bin.xml
@@ -38,7 +38,7 @@
         </file>
         <file>
             <source>
-                
../amoro-optimizer/amoro-optimizer-spark/target/amoro-optimizer-spark-${project.version}-jar-with-dependencies.jar
+                
../amoro-optimizer/amoro-optimizer-spark/target/amoro-optimizer-spark-${spark.major.version}_${scala.binary.version}-${project.version}-jar-with-dependencies.jar
             </source>
             <outputDirectory>plugin/optimizer/spark</outputDirectory>
             <destName>optimizer-job.jar</destName>
diff --git a/pom.xml b/pom.xml
index 914a61749..f80d6b74a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,7 @@
         <bitmap.version>1.0.1</bitmap.version>
         <prometheus.version>0.16.0</prometheus.version>
         <flink.version>1.20.3</flink.version>
+        <flink.scala.binary.version>2.12</flink.scala.binary.version>
         
<fabric8-kubernetes-client.version.version>6.13.5</fabric8-kubernetes-client.version.version>
         <amoro-shade.version>0.7.0-incubating</amoro-shade.version>
         <amoro-shade-guava.version>32.1.1-jre</amoro-shade-guava.version>
@@ -548,6 +549,22 @@
                 <version>${paimon.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.paimon</groupId>
+                <artifactId>paimon-spark-${spark.major.version}</artifactId>
+                <version>${paimon.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.spark</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.scala-lang</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
             <dependency>
                 <groupId>org.roaringbitmap</groupId>
                 <artifactId>RoaringBitmap</artifactId>
@@ -1471,6 +1488,22 @@
                 <spark.major.version>3.5</spark.major.version>
             </properties>
         </profile>
+        <profile>
+            <id>scala-2.12</id>
+            <properties>
+                <scala.version>2.12.15</scala.version>
+                <scala.binary.version>2.12</scala.binary.version>
+            </properties>
+        </profile>
+        <profile>
+            <id>scala-2.13</id>
+            <properties>
+                <scala.version>2.13.8</scala.version>
+                <scala.binary.version>2.13</scala.binary.version>
+                <!-- paimon-1.2.0 does not support scala-2.13 and datasource 
lookup will cause UT failre -->
+                <paimon.version>1.1.1</paimon.version>
+            </properties>
+        </profile>
         <profile>
             <id>openapi-sdk</id>
             <modules>

Reply via email to