This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a3576d9e9 [FLINK-38726][fluss] Bump Fluss version to 0.9.0-incubating
a3576d9e9 is described below

commit a3576d9e9f9d173278239868ae829d9aca7a65f0
Author: Leonard Xu <[email protected]>
AuthorDate: Fri Mar 6 11:11:55 2026 +0800

    [FLINK-38726][fluss] Bump Fluss version to 0.9.0-incubating
    
    This closes  #4180.
---
 .../flink-cdc-pipeline-connector-fluss/pom.xml     |  16 +-
 .../fluss/factory/FlussDataSinkFactory.java        |   4 +-
 .../cdc/connectors/fluss/sink/FlussDataSink.java   |   2 +-
 .../fluss/sink/FlussEventSerializationSchema.java  |  17 +-
 .../fluss/sink/FlussMetaDataApplier.java           |  16 +-
 .../connectors/fluss/sink/row/CdcAsFlussArray.java | 175 +++++++++++++++++++++
 .../CdcAsFlussMap.java}                            |  39 +++--
 .../fluss/sink/{ => row}/CdcAsFlussRow.java        |  30 +++-
 .../cdc/connectors/fluss/sink/v2/FlussEvent.java   |   2 +-
 .../fluss/sink/v2/FlussEventSerializer.java        |   2 +-
 .../connectors/fluss/sink/v2/FlussRowWithOp.java   |   4 +-
 .../cdc/connectors/fluss/sink/v2/FlussSink.java    |   2 +-
 .../connectors/fluss/sink/v2/FlussSinkWriter.java  |  24 +--
 .../fluss/sink/v2/metrics/WrappedFlussCounter.java |   4 +-
 .../fluss/sink/v2/metrics/WrappedFlussGauge.java   |   4 +-
 .../sink/v2/metrics/WrapperFlussHistogram.java     |   8 +-
 .../fluss/sink/v2/metrics/WrapperFlussMeter.java   |   4 +-
 .../v2/metrics/WrapperFlussMetricRegistry.java     |  16 +-
 .../connectors/fluss/utils/FlussConversions.java   | 109 +++++++------
 .../cdc/connectors/fluss/FlussPipelineITCase.java  |  26 +--
 .../sink/FlussEventSerializationSchemaTest.java    |   9 +-
 .../fluss/sink/FlussMetadataApplierTest.java       |  75 ++++-----
 .../connectors/fluss/sink/v2/FlussSinkITCase.java  |  12 +-
 .../fluss/utils/FlussConversionsTest.java          |  82 +++++-----
 .../flink-cdc-pipeline-e2e-tests/pom.xml           |  17 +-
 .../flink/cdc/pipeline/tests/FlussE2eITCase.java   |   8 +-
 26 files changed, 458 insertions(+), 249 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
index 2634c7aa2..816d99fd5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
@@ -33,12 +33,12 @@ limitations under the License.
 
 
     <properties>
-        <fluss.version>0.7.0</fluss.version>
+        <fluss.version>0.9.0-incubating</fluss.version>
     </properties>
 
     <dependencies>
         <dependency>
-            <groupId>com.alibaba.fluss</groupId>
+            <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-client</artifactId>
             <version>${fluss.version}</version>
         </dependency>
@@ -66,34 +66,34 @@ limitations under the License.
         </dependency>
 
         <dependency>
-            <groupId>com.alibaba.fluss</groupId>
+            <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-server</artifactId>
             <version>${fluss.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.alibaba.fluss</groupId>
+            <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-server</artifactId>
             <version>${fluss.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.alibaba.fluss</groupId>
+            <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-test-utils</artifactId>
             <version>${fluss.version}</version>
             <scope>test</scope>
         </dependency>
         <!-- In Flink CDC project has Pipeline Sink Connector for Fluss. we 
import fluss-fink for Fluss Sink Connector just for test purpose -->
         <dependency>
-            <groupId>com.alibaba.fluss</groupId>
+            <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-flink-common</artifactId>
             <version>${fluss.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.alibaba.fluss</groupId>
+            <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-flink-1.20</artifactId>
             <version>${fluss.version}</version>
             <scope>test</scope>
@@ -129,7 +129,7 @@ limitations under the License.
                             <shadeTestJar>false</shadeTestJar>
                             <artifactSet>
                                 <includes>
-                                    <include>com.alibaba.fluss:*</include>
+                                    <include>org.apache.fluss:*</include>
                                 </includes>
                             </artifactSet>
                         </configuration>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
index 754eb693a..58e469d2d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
@@ -23,8 +23,8 @@ import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.sink.DataSink;
 import org.apache.flink.cdc.connectors.fluss.sink.FlussDataSink;
 
-import com.alibaba.fluss.config.ConfigOptions;
-import com.alibaba.fluss.config.Configuration;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
 
 import java.util.HashMap;
 import java.util.HashSet;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
index e03caf8e6..4491e5032 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussSink;
 
-import com.alibaba.fluss.config.Configuration;
+import org.apache.fluss.config.Configuration;
 
 import java.util.List;
 import java.util.Map;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
index e92557b11..086da5894 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
@@ -24,14 +24,15 @@ import org.apache.flink.cdc.common.event.OperationType;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp;
 
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.table.Table;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.types.DataType;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataType;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -129,12 +130,12 @@ public class FlussEventSerializationSchema implements 
FlussEventSerializer<Event
 
     private static class TableSchemaInfo {
         org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
-        com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema;
+        org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
         Map<Integer, Integer> indexMapping;
 
         private TableSchemaInfo(
                 org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
-                com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema) {
+                org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
             this.upstreamCdcSchema = upstreamCdcSchema;
             this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
             this.indexMapping =
@@ -144,8 +145,8 @@ public class FlussEventSerializationSchema implements 
FlussEventSerializer<Event
     }
 
     static Map<Integer, Integer> sanityCheckAndGenerateIndexMapping(
-            com.alibaba.fluss.metadata.Schema inferredFlussSchema,
-            com.alibaba.fluss.metadata.Schema currentFlussNewSchema) {
+            org.apache.fluss.metadata.Schema inferredFlussSchema,
+            org.apache.fluss.metadata.Schema currentFlussNewSchema) {
         List<String> inferredSchemaColumnNames = 
inferredFlussSchema.getColumnNames();
         Map<String, Integer> reverseIndex = new HashMap<>();
         for (int i = 0; i < inferredSchemaColumnNames.size(); i++) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
index 2e13b70d9..4e28623e2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
@@ -26,14 +26,14 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.table.api.ValidationException;
 
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.client.admin.Admin;
-import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.metadata.DatabaseDescriptor;
-import com.alibaba.fluss.metadata.TableDescriptor;
-import com.alibaba.fluss.metadata.TableInfo;
-import com.alibaba.fluss.metadata.TablePath;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussArray.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussArray.java
new file mode 100644
index 000000000..4fa32e6a8
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussArray.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.row;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.TimestampData;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+
+import static 
org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow.fromFlinkDecimal;
+
+/** Wraps a CDC {@link ArrayData} as a Fluss {@link InternalArray}. */
+public class CdcAsFlussArray implements InternalArray {
+
+    private final ArrayData flussArray;
+
+    public CdcAsFlussArray(ArrayData flussArray) {
+        this.flussArray = flussArray;
+    }
+
+    @Override
+    public int size() {
+        return flussArray.size();
+    }
+
+    @Override
+    public boolean[] toBooleanArray() {
+        return flussArray.toBooleanArray();
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        return flussArray.toByteArray();
+    }
+
+    @Override
+    public short[] toShortArray() {
+        return flussArray.toShortArray();
+    }
+
+    @Override
+    public int[] toIntArray() {
+        return flussArray.toIntArray();
+    }
+
+    @Override
+    public long[] toLongArray() {
+        return flussArray.toLongArray();
+    }
+
+    @Override
+    public float[] toFloatArray() {
+        return flussArray.toFloatArray();
+    }
+
+    @Override
+    public double[] toDoubleArray() {
+        return flussArray.toDoubleArray();
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return flussArray.isNullAt(pos);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return flussArray.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return flussArray.getByte(pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return flussArray.getShort(pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return flussArray.getInt(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return flussArray.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return flussArray.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return flussArray.getDouble(pos);
+    }
+
+    @Override
+    public BinaryString getChar(int pos, int length) {
+        return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        return fromFlinkDecimal(flussArray.getDecimal(pos, precision, scale));
+    }
+
+    @Override
+    public TimestampNtz getTimestampNtz(int pos, int precision) {
+        TimestampData timestamp = flussArray.getTimestamp(pos, precision);
+        return TimestampNtz.fromMillis(
+                timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
+    }
+
+    @Override
+    public TimestampLtz getTimestampLtz(int pos, int precision) {
+        TimestampData timestamp = flussArray.getTimestamp(pos, precision);
+        return TimestampLtz.fromEpochMillis(
+                timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
+    }
+
+    @Override
+    public byte[] getBinary(int pos, int length) {
+        return flussArray.getBinary(pos);
+    }
+
+    @Override
+    public byte[] getBytes(int pos) {
+        return flussArray.getBinary(pos);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return new CdcAsFlussArray(flussArray.getArray(pos));
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        return new CdcAsFlussMap(flussArray.getMap(pos));
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return CdcAsFlussRow.replace(flussArray.getRecord(pos, numFields));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussMap.java
similarity index 52%
copy from 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
copy to 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussMap.java
index dadf0d2ee..b209001ee 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussMap.java
@@ -15,21 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.connectors.fluss.sink.v2;
+package org.apache.flink.cdc.connectors.fluss.sink.row;
 
-import com.alibaba.fluss.client.Connection;
+import org.apache.flink.cdc.common.data.MapData;
 
-import java.io.IOException;
-import java.io.Serializable;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
 
-/**
- * Serializer to serialize the input record to a {@link FlussEvent} for {@link 
FlussSinkWriter}.
- *
- * @param <InputT> The type of the input record which comes from the upstream 
and will be
- *     transformed into a FlussEvent here.
- */
-public interface FlussEventSerializer<InputT> extends Serializable {
-    void open(Connection connection) throws IOException;
+/** Wraps a Cdc {@link MapData} as a Fluss {@link InternalMap}. */
+public class CdcAsFlussMap implements InternalMap {
+
+    private final MapData cdcMap;
+
+    public CdcAsFlussMap(MapData cdcMap) {
+        this.cdcMap = cdcMap;
+    }
+
+    @Override
+    public int size() {
+        return cdcMap.size();
+    }
+
+    @Override
+    public InternalArray keyArray() {
+        return new CdcAsFlussArray(cdcMap.keyArray());
+    }
 
-    FlussEvent serialize(InputT in) throws IOException;
+    @Override
+    public InternalArray valueArray() {
+        return new CdcAsFlussArray(cdcMap.valueArray());
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
similarity index 87%
rename from 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java
rename to 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
index 5891d58a7..4a97ebc22 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.connectors.fluss.sink;
+package org.apache.flink.cdc.connectors.fluss.sink.row;
 
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
@@ -24,11 +24,13 @@ import 
org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.TimestampData;
 
-import com.alibaba.fluss.row.BinaryString;
-import com.alibaba.fluss.row.Decimal;
-import com.alibaba.fluss.row.InternalRow;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
 
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -167,6 +169,22 @@ public class CdcAsFlussRow implements InternalRow {
         return cdcRecord.getBinary(indexMapping.get(pos));
     }
 
+    @Override
+    public InternalArray getArray(int i) {
+        return new CdcAsFlussArray(cdcRecord.getArray(indexMapping.get(i)));
+    }
+
+    @Override
+    public InternalMap getMap(int i) {
+        return new CdcAsFlussMap(cdcRecord.getMap(indexMapping.get(i)));
+    }
+
+    @Override
+    public InternalRow getRow(int i, int numFields) {
+        return new CdcAsFlussRow(
+                cdcRecord.getRow(indexMapping.get(i), numFields), numFields, 
indexMapping);
+    }
+
     @VisibleForTesting
     public RecordData getCdcRecord() {
         return cdcRecord;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
index 75df49fa6..a31d07bd4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.cdc.connectors.fluss.sink.v2;
 
-import com.alibaba.fluss.metadata.TablePath;
+import org.apache.fluss.metadata.TablePath;
 
 import java.util.List;
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
index dadf0d2ee..c90cdc913 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.cdc.connectors.fluss.sink.v2;
 
-import com.alibaba.fluss.client.Connection;
+import org.apache.fluss.client.Connection;
 
 import java.io.IOException;
 import java.io.Serializable;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
index 0dedaf6cc..2042dbdc4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.cdc.connectors.fluss.sink.v2;
 
-import com.alibaba.fluss.row.InternalRow;
+import org.apache.fluss.row.InternalRow;
 
 import javax.annotation.Nullable;
 
 import java.util.Objects;
 
-import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /* This file is based on source code of Apache Fluss Project 
(https://fluss.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
index 4e6d789e9..0bd74f060 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 
-import com.alibaba.fluss.config.Configuration;
+import org.apache.fluss.config.Configuration;
 
 import java.io.IOException;
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
index dab9e6ed5..8a3754e0e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
@@ -23,18 +23,18 @@ import 
org.apache.flink.cdc.connectors.fluss.sink.v2.metrics.WrapperFlussMetricR
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.client.table.Table;
-import com.alibaba.fluss.client.table.writer.AppendWriter;
-import com.alibaba.fluss.client.table.writer.TableWriter;
-import com.alibaba.fluss.client.table.writer.UpsertWriter;
-import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.metrics.Gauge;
-import com.alibaba.fluss.metrics.Metric;
-import com.alibaba.fluss.metrics.MetricNames;
-import com.alibaba.fluss.row.InternalRow;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.Gauge;
+import org.apache.fluss.metrics.Metric;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.row.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
index 61bcff7b0..48d094096 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
@@ -26,9 +26,9 @@ import org.apache.flink.metrics.Counter;
 /** An implementation of Flink's {@link Counter} which wraps Fluss's Counter. 
*/
 public class WrappedFlussCounter implements Counter {
 
-    private final com.alibaba.fluss.metrics.Counter flussCounter;
+    private final org.apache.fluss.metrics.Counter flussCounter;
 
-    public WrappedFlussCounter(com.alibaba.fluss.metrics.Counter flussCounter) 
{
+    public WrappedFlussCounter(org.apache.fluss.metrics.Counter flussCounter) {
         this.flussCounter = flussCounter;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
index 1b5df21d0..45d7adaaf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
@@ -26,9 +26,9 @@ import org.apache.flink.metrics.Gauge;
 /** An implementation of Flink's {@link Gauge} which wraps Fluss's Gauge. */
 public class WrappedFlussGauge<T> implements Gauge<T> {
 
-    private final com.alibaba.fluss.metrics.Gauge<T> flussGauge;
+    private final org.apache.fluss.metrics.Gauge<T> flussGauge;
 
-    public WrappedFlussGauge(com.alibaba.fluss.metrics.Gauge<T> flussGauge) {
+    public WrappedFlussGauge(org.apache.fluss.metrics.Gauge<T> flussGauge) {
         this.flussGauge = flussGauge;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
index a62a646a3..402c5cc1a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
@@ -27,9 +27,9 @@ import org.apache.flink.metrics.HistogramStatistics;
 /** An implementation of Flink's {@link Histogram} which wraps Fluss's 
Histogram. */
 public class WrapperFlussHistogram implements Histogram {
 
-    private final com.alibaba.fluss.metrics.Histogram flussHistogram;
+    private final org.apache.fluss.metrics.Histogram flussHistogram;
 
-    public WrapperFlussHistogram(com.alibaba.fluss.metrics.Histogram 
flussHistogram) {
+    public WrapperFlussHistogram(org.apache.fluss.metrics.Histogram 
flussHistogram) {
         this.flussHistogram = flussHistogram;
     }
 
@@ -50,10 +50,10 @@ public class WrapperFlussHistogram implements Histogram {
 
     private static class FlinkHistogramStatistics extends HistogramStatistics {
 
-        private final com.alibaba.fluss.metrics.HistogramStatistics 
flussHistogramStatistics;
+        private final org.apache.fluss.metrics.HistogramStatistics 
flussHistogramStatistics;
 
         public FlinkHistogramStatistics(
-                com.alibaba.fluss.metrics.HistogramStatistics 
flussHistogramStatistics) {
+                org.apache.fluss.metrics.HistogramStatistics 
flussHistogramStatistics) {
             this.flussHistogramStatistics = flussHistogramStatistics;
         }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
index 40bac46a8..0b415b202 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
@@ -26,9 +26,9 @@ import org.apache.flink.metrics.Meter;
 /** An implementation of Flink's {@link Meter} which wraps Fluss's Meter. */
 public class WrapperFlussMeter implements Meter {
 
-    private final com.alibaba.fluss.metrics.Meter flussMeter;
+    private final org.apache.fluss.metrics.Meter flussMeter;
 
-    public WrapperFlussMeter(com.alibaba.fluss.metrics.Meter flussMeter) {
+    public WrapperFlussMeter(org.apache.fluss.metrics.Meter flussMeter) {
         this.flussMeter = flussMeter;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
index 4251a71d6..39932bc2e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
@@ -19,14 +19,14 @@ package 
org.apache.flink.cdc.connectors.fluss.sink.v2.metrics;
 
 import org.apache.flink.metrics.MetricGroup;
 
-import com.alibaba.fluss.metrics.CharacterFilter;
-import com.alibaba.fluss.metrics.Counter;
-import com.alibaba.fluss.metrics.Gauge;
-import com.alibaba.fluss.metrics.Histogram;
-import com.alibaba.fluss.metrics.Meter;
-import com.alibaba.fluss.metrics.Metric;
-import com.alibaba.fluss.metrics.groups.AbstractMetricGroup;
-import com.alibaba.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.Gauge;
+import org.apache.fluss.metrics.Histogram;
+import org.apache.fluss.metrics.Meter;
+import org.apache.fluss.metrics.Metric;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
 
 import java.util.Collections;
 import java.util.HashMap;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
index eb311dd0e..25a9f53b6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.types.BigIntType;
 import org.apache.flink.cdc.common.types.BinaryType;
 import org.apache.flink.cdc.common.types.BooleanType;
 import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DateType;
 import org.apache.flink.cdc.common.types.DecimalType;
 import org.apache.flink.cdc.common.types.DoubleType;
@@ -37,11 +38,12 @@ import org.apache.flink.cdc.common.types.TinyIntType;
 import org.apache.flink.cdc.common.types.VarBinaryType;
 import org.apache.flink.cdc.common.types.VarCharType;
 import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
-import com.alibaba.fluss.annotation.VisibleForTesting;
-import com.alibaba.fluss.metadata.Schema;
-import com.alibaba.fluss.metadata.TableDescriptor;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
 
 import javax.annotation.Nullable;
 
@@ -85,7 +87,7 @@ public class FlussConversions {
                 .build();
     }
 
-    public static com.alibaba.fluss.metadata.Schema toFlussSchema(
+    public static org.apache.fluss.metadata.Schema toFlussSchema(
             org.apache.flink.cdc.common.schema.Schema cdcSchema) {
         Schema.Builder schemBuilder = Schema.newBuilder();
         if (!CollectionUtil.isNullOrEmpty(cdcSchema.primaryKeys())) {
@@ -108,7 +110,7 @@ public class FlussConversions {
     }
 
     @VisibleForTesting
-    private static com.alibaba.fluss.types.DataType toFlussType(
+    private static org.apache.fluss.types.DataType toFlussType(
             org.apache.flink.cdc.common.types.DataType flinkDataType) {
         return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE);
     }
@@ -137,119 +139,128 @@ public class FlussConversions {
 
     private static class CdcTypeToFlussType
             implements org.apache.flink.cdc.common.types.DataTypeVisitor<
-                    com.alibaba.fluss.types.DataType> {
+                    org.apache.fluss.types.DataType> {
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(CharType charType) {
-            return new com.alibaba.fluss.types.CharType(
-                    charType.isNullable(), charType.getLength());
+        public org.apache.fluss.types.DataType visit(CharType charType) {
+            return new org.apache.fluss.types.CharType(charType.isNullable(), 
charType.getLength());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(VarCharType varCharType) 
{
+        public org.apache.fluss.types.DataType visit(VarCharType varCharType) {
             // fluss not support varchar type
-            return new 
com.alibaba.fluss.types.StringType(varCharType.isNullable());
+            return new 
org.apache.fluss.types.StringType(varCharType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(BooleanType booleanType) 
{
-            return new 
com.alibaba.fluss.types.BooleanType(booleanType.isNullable());
+        public org.apache.fluss.types.DataType visit(BooleanType booleanType) {
+            return new 
org.apache.fluss.types.BooleanType(booleanType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(BinaryType binaryType) {
-            return new com.alibaba.fluss.types.BinaryType(
+        public org.apache.fluss.types.DataType visit(BinaryType binaryType) {
+            return new org.apache.fluss.types.BinaryType(
                     binaryType.isNullable(), binaryType.getLength());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(VarBinaryType 
varBinaryType) {
+        public org.apache.fluss.types.DataType visit(VarBinaryType 
varBinaryType) {
             // fluss not support varbinary type
-            return new 
com.alibaba.fluss.types.BytesType(varBinaryType.isNullable());
+            return new 
org.apache.fluss.types.BytesType(varBinaryType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(DecimalType decimalType) 
{
-            return new com.alibaba.fluss.types.DecimalType(
+        public org.apache.fluss.types.DataType visit(DecimalType decimalType) {
+            return new org.apache.fluss.types.DecimalType(
                     decimalType.isNullable(), decimalType.getPrecision(), 
decimalType.getScale());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(TinyIntType tinyIntType) 
{
-            return new 
com.alibaba.fluss.types.TinyIntType(tinyIntType.isNullable());
+        public org.apache.fluss.types.DataType visit(TinyIntType tinyIntType) {
+            return new 
org.apache.fluss.types.TinyIntType(tinyIntType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(SmallIntType 
smallIntType) {
-            return new 
com.alibaba.fluss.types.SmallIntType(smallIntType.isNullable());
+        public org.apache.fluss.types.DataType visit(SmallIntType 
smallIntType) {
+            return new 
org.apache.fluss.types.SmallIntType(smallIntType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(IntType intType) {
-            return new com.alibaba.fluss.types.IntType(intType.isNullable());
+        public org.apache.fluss.types.DataType visit(IntType intType) {
+            return new org.apache.fluss.types.IntType(intType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(BigIntType bigIntType) {
-            return new 
com.alibaba.fluss.types.BigIntType(bigIntType.isNullable());
+        public org.apache.fluss.types.DataType visit(BigIntType bigIntType) {
+            return new 
org.apache.fluss.types.BigIntType(bigIntType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(FloatType floatType) {
-            return new 
com.alibaba.fluss.types.FloatType(floatType.isNullable());
+        public org.apache.fluss.types.DataType visit(FloatType floatType) {
+            return new 
org.apache.fluss.types.FloatType(floatType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(DoubleType doubleType) {
-            return new 
com.alibaba.fluss.types.DoubleType(doubleType.isNullable());
+        public org.apache.fluss.types.DataType visit(DoubleType doubleType) {
+            return new 
org.apache.fluss.types.DoubleType(doubleType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(DateType dateType) {
-            return new com.alibaba.fluss.types.DateType(dateType.isNullable());
+        public org.apache.fluss.types.DataType visit(DateType dateType) {
+            return new org.apache.fluss.types.DateType(dateType.isNullable());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(TimeType timeType) {
-            return new com.alibaba.fluss.types.TimeType(
+        public org.apache.fluss.types.DataType visit(TimeType timeType) {
+            return new org.apache.fluss.types.TimeType(
                     timeType.isNullable(), timeType.getPrecision());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(TimestampType 
timestampType) {
-            return new com.alibaba.fluss.types.TimestampType(
+        public org.apache.fluss.types.DataType visit(TimestampType 
timestampType) {
+            return new org.apache.fluss.types.TimestampType(
                     timestampType.isNullable(), timestampType.getPrecision());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(ZonedTimestampType 
zonedTimestampType) {
+        public org.apache.fluss.types.DataType visit(ZonedTimestampType 
zonedTimestampType) {
             throw new UnsupportedOperationException(
                     "Unsupported data type in fluss " + zonedTimestampType);
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(
+        public org.apache.fluss.types.DataType visit(
                 LocalZonedTimestampType localZonedTimestampType) {
-            return new com.alibaba.fluss.types.LocalZonedTimestampType(
+            return new org.apache.fluss.types.LocalZonedTimestampType(
                     localZonedTimestampType.isNullable(), 
localZonedTimestampType.getPrecision());
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(ArrayType arrayType) {
-            throw new UnsupportedOperationException(
-                    "Unsupported data type in fluss version under 0.7: " + 
arrayType);
+        public org.apache.fluss.types.DataType visit(ArrayType arrayType) {
+            List<DataType> children = arrayType.getChildren();
+            Preconditions.checkState(!children.isEmpty());
+            org.apache.fluss.types.DataType flussChildType = 
toFlussType(children.get(0));
+            return new 
org.apache.fluss.types.ArrayType(arrayType.isNullable(), flussChildType);
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(MapType mapType) {
-            throw new UnsupportedOperationException(
-                    "Unsupported data type in fluss version under 0.7: " + 
mapType);
+        public org.apache.fluss.types.DataType visit(MapType mapType) {
+            org.apache.fluss.types.DataType flussKeyType = 
toFlussType(mapType.getKeyType());
+            org.apache.fluss.types.DataType flussValueType = 
toFlussType(mapType.getValueType());
+            return new org.apache.fluss.types.MapType(
+                    mapType.isNullable(), flussKeyType, flussValueType);
         }
 
         @Override
-        public com.alibaba.fluss.types.DataType visit(RowType rowType) {
-            throw new UnsupportedOperationException(
-                    "Unsupported data type in fluss version under 0.7: " + 
rowType);
+        public org.apache.fluss.types.DataType visit(RowType rowType) {
+            return new org.apache.fluss.types.RowType(
+                    rowType.isNullable(),
+                    rowType.getFields().stream()
+                            .map(
+                                    field ->
+                                            new 
org.apache.fluss.types.DataField(
+                                                    field.getName(), 
field.getType().accept(this)))
+                            .collect(Collectors.toList()));
         }
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
index ec2d2b4a2..bfc71a1ad 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
@@ -45,12 +45,12 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.config.ConfigOptions;
-import com.alibaba.fluss.config.MemorySize;
-import com.alibaba.fluss.metadata.DataLakeFormat;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -64,12 +64,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
-import static 
com.alibaba.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static 
com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static 
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
 import static 
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
 import static 
org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for Fluss Pipeline. */
@@ -320,7 +320,7 @@ public class FlussPipelineITCase {
                                         
ValuesDataSourceHelper.singleSplitSingleTable(),
                                         sinkOption))
                 .rootCause()
-                .hasMessageContaining("'table.non-key' is not a Fluss table 
property");
+                .hasMessageContaining("'table.non-key' is not a recognized 
Fluss table property");
     }
 
     @Test
@@ -566,8 +566,8 @@ public class FlussPipelineITCase {
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
-    private static com.alibaba.fluss.config.Configuration initConfig() {
-        com.alibaba.fluss.config.Configuration conf = new 
com.alibaba.fluss.config.Configuration();
+    private static org.apache.fluss.config.Configuration initConfig() {
+        org.apache.fluss.config.Configuration conf = new 
org.apache.fluss.config.Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
         // set a shorter interval for testing purpose
         conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
@@ -584,7 +584,7 @@ public class FlussPipelineITCase {
         conf.setString("security.sasl.enabled.mechanisms", "plain");
         conf.setString(
                 "security.sasl.plain.jaas.config",
-                "com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule 
required "
+                "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule 
required "
                         + "    user_root=\"password\" "
                         + "    user_guest=\"password2\";");
         return conf;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
index 7097a72ee..1af59aa97 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
@@ -36,15 +36,16 @@ import org.apache.flink.cdc.common.types.IntType;
 import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
index 9c35f002d..c95aa7868 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
@@ -24,15 +24,15 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.IntType;
 
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.client.admin.Admin;
-import com.alibaba.fluss.exception.InvalidConfigException;
-import com.alibaba.fluss.metadata.TableDescriptor;
-import com.alibaba.fluss.metadata.TableInfo;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
-import com.alibaba.fluss.types.RowType;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.types.RowType;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -47,7 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
-import static com.alibaba.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
+import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -129,28 +129,28 @@ public class FlussMetadataApplierTest {
                     DataTypes.TIMESTAMP_LTZ(6)
                 };
 
-        com.alibaba.fluss.types.DataType[] flussDataTypes =
-                new com.alibaba.fluss.types.DataType[] {
-                    com.alibaba.fluss.types.DataTypes.BINARY(10),
+        org.apache.fluss.types.DataType[] flussDataTypes =
+                new org.apache.fluss.types.DataType[] {
+                    org.apache.fluss.types.DataTypes.BINARY(10),
                     // fluss not support binary, will be mapped to bytes
-                    com.alibaba.fluss.types.DataTypes.BYTES(),
-                    com.alibaba.fluss.types.DataTypes.BYTES(),
-                    com.alibaba.fluss.types.DataTypes.BOOLEAN(),
-                    com.alibaba.fluss.types.DataTypes.TINYINT(),
-                    com.alibaba.fluss.types.DataTypes.SMALLINT(),
-                    new com.alibaba.fluss.types.IntType(false),
-                    com.alibaba.fluss.types.DataTypes.BIGINT(),
-                    com.alibaba.fluss.types.DataTypes.FLOAT(),
-                    com.alibaba.fluss.types.DataTypes.DOUBLE(),
-                    com.alibaba.fluss.types.DataTypes.DECIMAL(38, 18),
-                    com.alibaba.fluss.types.DataTypes.CHAR(10),
+                    org.apache.fluss.types.DataTypes.BYTES(),
+                    org.apache.fluss.types.DataTypes.BYTES(),
+                    org.apache.fluss.types.DataTypes.BOOLEAN(),
+                    org.apache.fluss.types.DataTypes.TINYINT(),
+                    org.apache.fluss.types.DataTypes.SMALLINT(),
+                    new org.apache.fluss.types.IntType(false),
+                    org.apache.fluss.types.DataTypes.BIGINT(),
+                    org.apache.fluss.types.DataTypes.FLOAT(),
+                    org.apache.fluss.types.DataTypes.DOUBLE(),
+                    org.apache.fluss.types.DataTypes.DECIMAL(38, 18),
+                    org.apache.fluss.types.DataTypes.CHAR(10),
                     // fluss not support varchar, will be mapped to string
-                    com.alibaba.fluss.types.DataTypes.STRING(),
-                    com.alibaba.fluss.types.DataTypes.STRING(),
-                    com.alibaba.fluss.types.DataTypes.DATE(),
-                    com.alibaba.fluss.types.DataTypes.TIME(),
-                    com.alibaba.fluss.types.DataTypes.TIMESTAMP(3),
-                    com.alibaba.fluss.types.DataTypes.TIMESTAMP_LTZ(6)
+                    org.apache.fluss.types.DataTypes.STRING(),
+                    org.apache.fluss.types.DataTypes.STRING(),
+                    org.apache.fluss.types.DataTypes.DATE(),
+                    org.apache.fluss.types.DataTypes.TIME(),
+                    org.apache.fluss.types.DataTypes.TIMESTAMP(3),
+                    org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6)
                 };
 
         try (FlussMetaDataApplier applier =
@@ -188,17 +188,10 @@ public class FlussMetadataApplierTest {
 
     @Test
     void testUnsupportedType() throws Exception {
-        String[] fieldNames = new String[] {"timestamp_tz_col", "array_col", 
"map_col", "row_col"};
+        String[] fieldNames = new String[] {"timestamp_tz_col"};
 
         org.apache.flink.cdc.common.types.DataType[] cdcDataTypes =
-                new org.apache.flink.cdc.common.types.DataType[] {
-                    DataTypes.ARRAY(DataTypes.STRING()),
-                    DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
-                    DataTypes.ROW(
-                            DataTypes.FIELD("name", DataTypes.STRING()),
-                            DataTypes.FIELD("age", DataTypes.INT())),
-                    DataTypes.TIMESTAMP_TZ()
-                };
+                new org.apache.flink.cdc.common.types.DataType[] 
{DataTypes.TIMESTAMP_TZ()};
 
         try (FlussMetaDataApplier applier =
                 new FlussMetaDataApplier(
@@ -227,10 +220,10 @@ public class FlussMetadataApplierTest {
                         tablePath,
                         TableDescriptor.builder()
                                 .schema(
-                                        
com.alibaba.fluss.metadata.Schema.newBuilder()
+                                        
org.apache.fluss.metadata.Schema.newBuilder()
                                                 .column(
                                                         "id",
-                                                        
com.alibaba.fluss.types.DataTypes.INT())
+                                                        
org.apache.fluss.types.DataTypes.INT())
                                                 .build())
                                 .build(),
                         true)
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
index e65a7d199..1cd80c6a6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
@@ -40,9 +40,9 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -58,9 +58,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
-import static 
com.alibaba.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static 
com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration tests for FlussSink. */
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
index 2f3487528..eddaeb6ff 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.cdc.connectors.fluss.utils;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 
-import com.alibaba.fluss.metadata.TableDescriptor;
-import com.alibaba.fluss.types.RowType;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.types.RowType;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -48,7 +48,7 @@ class FlussConversionsTest {
                         .primaryKey("id")
                         .build();
 
-        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+        org.apache.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
 
         assertThat(flussSchema.getColumnNames()).containsExactly("id", "name");
         
assertThat(flussSchema.getPrimaryKeyColumnNames()).containsExactly("id");
@@ -62,7 +62,7 @@ class FlussConversionsTest {
                         .physicalColumn("name", DataTypes.STRING())
                         .build();
 
-        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+        org.apache.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
 
         assertThat(flussSchema.getColumnNames()).hasSize(2);
         assertThat(flussSchema.getPrimaryKeyColumnNames()).isEmpty();
@@ -88,33 +88,43 @@ class FlussConversionsTest {
                         .physicalColumn("time_col", DataTypes.TIME())
                         .physicalColumn("timestamp_col", 
DataTypes.TIMESTAMP(3))
                         .physicalColumn("ltz_col", DataTypes.TIMESTAMP_LTZ(6))
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
                         .build();
 
-        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+        org.apache.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
 
         RowType rowType = flussSchema.getRowType();
-        assertThat(rowType.getFieldCount()).isEqualTo(16);
-
-        
assertThat(rowType.getTypeAt(0)).isEqualTo(com.alibaba.fluss.types.DataTypes.BOOLEAN());
-        
assertThat(rowType.getTypeAt(1)).isEqualTo(com.alibaba.fluss.types.DataTypes.TINYINT());
-        
assertThat(rowType.getTypeAt(2)).isEqualTo(com.alibaba.fluss.types.DataTypes.SMALLINT());
-        
assertThat(rowType.getTypeAt(3)).isEqualTo(com.alibaba.fluss.types.DataTypes.INT());
-        
assertThat(rowType.getTypeAt(4)).isEqualTo(com.alibaba.fluss.types.DataTypes.BIGINT());
-        
assertThat(rowType.getTypeAt(5)).isEqualTo(com.alibaba.fluss.types.DataTypes.FLOAT());
-        
assertThat(rowType.getTypeAt(6)).isEqualTo(com.alibaba.fluss.types.DataTypes.DOUBLE());
-        assertThat(rowType.getTypeAt(7))
-                .isEqualTo(com.alibaba.fluss.types.DataTypes.DECIMAL(10, 2));
-        
assertThat(rowType.getTypeAt(8)).isEqualTo(com.alibaba.fluss.types.DataTypes.CHAR(10));
+        assertThat(rowType.getFieldCount()).isEqualTo(18);
+
+        
assertThat(rowType.getTypeAt(0)).isEqualTo(org.apache.fluss.types.DataTypes.BOOLEAN());
+        
assertThat(rowType.getTypeAt(1)).isEqualTo(org.apache.fluss.types.DataTypes.TINYINT());
+        
assertThat(rowType.getTypeAt(2)).isEqualTo(org.apache.fluss.types.DataTypes.SMALLINT());
+        
assertThat(rowType.getTypeAt(3)).isEqualTo(org.apache.fluss.types.DataTypes.INT());
+        
assertThat(rowType.getTypeAt(4)).isEqualTo(org.apache.fluss.types.DataTypes.BIGINT());
+        
assertThat(rowType.getTypeAt(5)).isEqualTo(org.apache.fluss.types.DataTypes.FLOAT());
+        
assertThat(rowType.getTypeAt(6)).isEqualTo(org.apache.fluss.types.DataTypes.DOUBLE());
+        
assertThat(rowType.getTypeAt(7)).isEqualTo(org.apache.fluss.types.DataTypes.DECIMAL(10,
 2));
+        
assertThat(rowType.getTypeAt(8)).isEqualTo(org.apache.fluss.types.DataTypes.CHAR(10));
         // VarChar maps to StringType in Fluss
-        
assertThat(rowType.getTypeAt(9)).isEqualTo(com.alibaba.fluss.types.DataTypes.STRING());
-        
assertThat(rowType.getTypeAt(10)).isEqualTo(com.alibaba.fluss.types.DataTypes.BINARY(16));
+        
assertThat(rowType.getTypeAt(9)).isEqualTo(org.apache.fluss.types.DataTypes.STRING());
+        
assertThat(rowType.getTypeAt(10)).isEqualTo(org.apache.fluss.types.DataTypes.BINARY(16));
         // VarBinary maps to BytesType in Fluss
-        
assertThat(rowType.getTypeAt(11)).isEqualTo(com.alibaba.fluss.types.DataTypes.BYTES());
-        
assertThat(rowType.getTypeAt(12)).isEqualTo(com.alibaba.fluss.types.DataTypes.DATE());
-        
assertThat(rowType.getTypeAt(13)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIME());
-        
assertThat(rowType.getTypeAt(14)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP(3));
+        
assertThat(rowType.getTypeAt(11)).isEqualTo(org.apache.fluss.types.DataTypes.BYTES());
+        
assertThat(rowType.getTypeAt(12)).isEqualTo(org.apache.fluss.types.DataTypes.DATE());
+        
assertThat(rowType.getTypeAt(13)).isEqualTo(org.apache.fluss.types.DataTypes.TIME());
+        
assertThat(rowType.getTypeAt(14)).isEqualTo(org.apache.fluss.types.DataTypes.TIMESTAMP(3));
         assertThat(rowType.getTypeAt(15))
-                .isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP_LTZ(6));
+                .isEqualTo(org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6));
+        assertThat(rowType.getTypeAt(16))
+                .isEqualTo(
+                        org.apache.fluss.types.DataTypes.ARRAY(
+                                org.apache.fluss.types.DataTypes.INT()));
+        assertThat(rowType.getTypeAt(17))
+                .isEqualTo(
+                        org.apache.fluss.types.DataTypes.MAP(
+                                org.apache.fluss.types.DataTypes.STRING(),
+                                org.apache.fluss.types.DataTypes.INT()));
     }
 
     @Test
@@ -126,7 +136,7 @@ class FlussConversionsTest {
                         .physicalColumn("not_null_col", 
DataTypes.INT().notNull())
                         .build();
 
-        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+        org.apache.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
 
         RowType rowType = flussSchema.getRowType();
         assertThat(rowType.getTypeAt(0).isNullable()).isTrue();
@@ -146,28 +156,6 @@ class FlussConversionsTest {
                 .hasMessageContaining("Unsupported data type in fluss");
     }
 
-    @Test
-    void testToFlussSchemaUnsupportedArrayType() {
-        Schema cdcSchema =
-                Schema.newBuilder().physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.INT())).build();
-
-        assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Unsupported data type in fluss");
-    }
-
-    @Test
-    void testToFlussSchemaUnsupportedMapType() {
-        Schema cdcSchema =
-                Schema.newBuilder()
-                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
-                        .build();
-
-        assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Unsupported data type in fluss");
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Tests for toFlussTable
     // 
--------------------------------------------------------------------------------------------
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index a083ce561..8c263f8de 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -42,7 +42,7 @@ limitations under the License.
         <maven.plugin.download.version>1.6.8</maven.plugin.download.version>
         <iceberg.version>1.10.1</iceberg.version>
         <hive.version>2.3.9</hive.version>
-        <fluss.version>0.7.0</fluss.version>
+        <fluss.version>0.9.0-incubating</fluss.version>
         <jmh.version>1.37</jmh.version>
         <hudi.version>1.1.0</hudi.version>
     </properties>
@@ -673,10 +673,19 @@ limitations under the License.
                             </outputDirectory>
                         </artifactItem>
                         <artifactItem>
-                            <groupId>com.alibaba.fluss</groupId>
-                            
<artifactId>fluss-flink-${flink.major.version}</artifactId>
+                            <groupId>org.apache.fluss</groupId>
+                            
<artifactId>fluss-flink-${flink-major-1.20}</artifactId>
                             <version>${fluss.version}</version>
-                            
<destFileName>fluss-sql-connector.jar</destFileName>
+                            
<destFileName>fluss-flink-${flink-1.20}.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies
+                            </outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.fluss</groupId>
+                            
<artifactId>fluss-flink-${flink-major-1.19}</artifactId>
+                            <version>${fluss.version}</version>
+                            
<destFileName>fluss-flink-${flink-1.19}.jar</destFileName>
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies
                             </outputDirectory>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
index c20808fa3..8f12e20c1 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
@@ -53,7 +53,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlussE2eITCase.class);
     private static final Duration FLUSS_TESTCASE_TIMEOUT = 
Duration.ofMinutes(3);
-    private static final String flussImageTag = "fluss/fluss:0.7.0";
+    private static final String flussImageTag = 
"apache/fluss:0.9.0-incubating";
     private static final String zooKeeperImageTag = "zookeeper:3.9.2";
 
     private static final List<String> flussCoordinatorProperties =
@@ -64,7 +64,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment {
                     "remote.data.dir: /tmp/fluss/remote-data",
                     "security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
                     "security.sasl.enabled.mechanisms: PLAIN",
-                    "security.sasl.plain.jaas.config: 
com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
+                    "security.sasl.plain.jaas.config: 
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
                     "super.users: User:admin");
 
     private static final List<String> flussTabletServerProperties =
@@ -78,7 +78,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment {
                     "remote.data.dir: /tmp/fluss/remote-data",
                     "security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
                     "security.sasl.enabled.mechanisms: PLAIN",
-                    "security.sasl.plain.jaas.config: 
com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
+                    "security.sasl.plain.jaas.config: 
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
                     "super.users: User:admin");
 
     @Container
@@ -129,7 +129,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment 
{
         // Due to a bug described in 
https://github.com/apache/fluss/pull/1267, it's not viable to
         // pass Fluss dependency with `--jar` CLI option. We may remove this 
workaround and use
         // `submitPipelineJob` to carry extra jar later.
-        return Collections.singletonList("fluss-sql-connector.jar");
+        return Collections.singletonList(String.format("fluss-flink-%s.jar", 
flinkVersion));
     }
 
     @BeforeEach

Reply via email to