This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 79ca53bff [flink] Use InternalRow instead of RowData in sink sub topo 
(#2004)
79ca53bff is described below

commit 79ca53bffb8b65025f87c1246a8af603fac5256a
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 15:49:41 2023 +0800

    [flink] Use InternalRow instead of RowData in sink sub topo (#2004)
---
 .../flink/sink/DynamicBucketRowWriteOperator.java  | 16 ++--
 .../apache/paimon/flink/sink/FileStoreSink.java    |  6 +-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 +--
 .../paimon/flink/sink/LocalMergeOperator.java      | 17 ++--
 ...tionKeyExtractor.java => MapToInternalRow.java} | 31 +++----
 .../paimon/flink/sink/RowDataChannelComputer.java  | 14 ++--
 .../flink/sink/RowDataStoreWriteOperator.java      | 11 ++-
 .../paimon/flink/sink/RowDynamicBucketSink.java    | 15 ++--
 .../flink/sink/RowHashKeyChannelComputer.java      | 10 ++-
 .../flink/sink/RowWithBucketChannelComputer.java   | 10 ++-
 .../paimon/flink/sink/UnawareBucketWriteSink.java  |  4 +-
 .../flink/sink/index/GlobalDynamicBucketSink.java  | 30 ++++---
 .../sink/index/GlobalIndexAssignerOperator.java    | 17 ++--
 .../sink/index/KeyPartPartitionKeyExtractor.java   | 12 +--
 .../sink/index/KeyPartRowChannelComputer.java      | 11 ++-
 .../apache/paimon/flink/sorter/OrderSorter.java    |  6 +-
 .../org/apache/paimon/flink/sorter/SortUtils.java  | 10 +--
 .../flink/utils/InternalRowTypeSerializer.java     |  5 ++
 .../paimon/flink/utils/InternalTypeInfo.java       |  7 ++
 .../flink/utils/ProjectToRowDataFunction.java      | 36 ++++-----
 .../apache/paimon/flink/sink/FlinkSinkTest.java    |  9 +--
 .../flink/sink/RowDataChannelComputerTest.java     | 22 ++---
 .../flink/sink/index/GlobalIndexAssignerTest.java  | 94 +++++++++++-----------
 23 files changed, 193 insertions(+), 211 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
index 7db9c2802..2a2d1fe20 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
@@ -18,16 +18,18 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.DynamicBucketRow;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
 
-/** A {@link PrepareCommitOperator} to write {@link RowData} with bucket. 
Record schema is fixed. */
-public class DynamicBucketRowWriteOperator extends 
TableWriteOperator<Tuple2<RowData, Integer>> {
+/**
+ * A {@link PrepareCommitOperator} to write {@link InternalRow} with bucket. 
Record schema is fixed.
+ */
+public class DynamicBucketRowWriteOperator
+        extends TableWriteOperator<Tuple2<InternalRow, Integer>> {
 
     private static final long serialVersionUID = 1L;
 
@@ -44,9 +46,9 @@ public class DynamicBucketRowWriteOperator extends 
TableWriteOperator<Tuple2<Row
     }
 
     @Override
-    public void processElement(StreamRecord<Tuple2<RowData, Integer>> element) 
throws Exception {
-        FlinkRowWrapper internalRow = new 
FlinkRowWrapper(element.getValue().f0);
-        DynamicBucketRow row = new DynamicBucketRow(internalRow, 
element.getValue().f1);
+    public void processElement(StreamRecord<Tuple2<InternalRow, Integer>> 
element)
+            throws Exception {
+        DynamicBucketRow row = new DynamicBucketRow(element.getValue().f0, 
element.getValue().f1);
         write.write(row);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 6b40815ed..5481ae1f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -18,17 +18,17 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
 import java.util.Map;
 
 /** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkWriteSink<RowData> {
+public class FileStoreSink extends FlinkWriteSink<InternalRow> {
 
     private static final long serialVersionUID = 1L;
 
@@ -43,7 +43,7 @@ public class FileStoreSink extends FlinkWriteSink<RowData> {
     }
 
     @Override
-    protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+    protected OneInputStreamOperator<InternalRow, Committable> 
createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new RowDataStoreWriteOperator(table, logSinkFunction, 
writeProvider, commitUser);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 4102d5ca1..cd9edb252 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.BucketMode;
@@ -69,7 +70,7 @@ public class FlinkSinkBuilder {
     }
 
     public DataStreamSink<?> build() {
-        DataStream<RowData> input = this.input;
+        DataStream<InternalRow> input = MapToInternalRow.map(this.input, 
table.rowType());
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
             input =
                     input.forward()
@@ -96,15 +97,15 @@ public class FlinkSinkBuilder {
     }
 
     private DataStreamSink<?> buildDynamicBucketSink(
-            DataStream<RowData> input, boolean globalIndex) {
+            DataStream<InternalRow> input, boolean globalIndex) {
         checkArgument(logSinkFunction == null, "Dynamic bucket mode can not 
work with log system.");
         return globalIndex
                 ? new GlobalDynamicBucketSink(table, 
overwritePartition).build(input, parallelism)
                 : new RowDynamicBucketSink(table, 
overwritePartition).build(input, parallelism);
     }
 
-    private DataStreamSink<?> buildForFixedBucket(DataStream<RowData> input) {
-        DataStream<RowData> partitioned =
+    private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> 
input) {
+        DataStream<InternalRow> partitioned =
                 partition(
                         input,
                         new RowDataChannelComputer(table.schema(), 
logSinkFunction != null),
@@ -113,7 +114,7 @@ public class FlinkSinkBuilder {
         return sink.sinkFrom(partitioned);
     }
 
-    private DataStreamSink<?> buildUnawareBucketSink(DataStream<RowData> 
input) {
+    private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> 
input) {
         checkArgument(
                 table instanceof AppendOnlyFileStoreTable,
                 "Unaware bucket mode only works with append-only table for 
now.");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 52581a1b5..aab614f03 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -24,8 +24,6 @@ import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.mergetree.SortBufferWriteBuffer;
 import org.apache.paimon.mergetree.compact.MergeFunction;
@@ -43,14 +41,13 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
 
 /**
  * {@link AbstractStreamOperator} which buffer input record and apply merge 
function when the buffer
  * is full. Mainly to resolve data skew on primary keys.
  */
-public class LocalMergeOperator extends AbstractStreamOperator<RowData>
-        implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+public class LocalMergeOperator extends AbstractStreamOperator<InternalRow>
+        implements OneInputStreamOperator<InternalRow, InternalRow>, 
BoundedOneInput {
 
     private static final long serialVersionUID = 1L;
 
@@ -66,7 +63,6 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<RowData>
     private transient SortBufferWriteBuffer buffer;
     private transient long currentWatermark;
 
-    private transient FlinkRowData reusedRowData;
     private transient boolean endOfInput;
 
     public LocalMergeOperator(TableSchema schema) {
@@ -106,14 +102,13 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<RowData>
                         null);
         currentWatermark = Long.MIN_VALUE;
 
-        reusedRowData = new FlinkRowData(null);
         endOfInput = false;
     }
 
     @Override
-    public void processElement(StreamRecord<RowData> record) throws Exception {
+    public void processElement(StreamRecord<InternalRow> record) throws 
Exception {
         recordCount++;
-        InternalRow row = new FlinkRowWrapper(record.getValue());
+        InternalRow row = record.getValue();
 
         RowKind rowKind = row.getRowKind();
         // row kind must be INSERT when it is divided into key and value
@@ -133,7 +128,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<RowData>
     }
 
     @Override
-    public void processWatermark(Watermark mark) throws Exception {
+    public void processWatermark(Watermark mark) {
         // don't emit watermark immediately, emit them after flushing buffer
         currentWatermark = mark.getTimestamp();
     }
@@ -173,7 +168,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<RowData>
                 kv -> {
                     InternalRow row = kv.value();
                     row.setRowKind(kv.valueKind());
-                    output.collect(new 
StreamRecord<>(reusedRowData.replace(row)));
+                    output.collect(new StreamRecord<>(row));
                 });
         buffer.clear();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataPartitionKeyExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
similarity index 53%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataPartitionKeyExtractor.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
index 9db37c7cd..a9bf74401 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataPartitionKeyExtractor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
@@ -18,30 +18,21 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.types.RowType;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.data.RowData;
 
-/** A {@link PartitionKeyExtractor} to {@link RowData}. */
-public class RowDataPartitionKeyExtractor implements 
PartitionKeyExtractor<RowData> {
+/** An util to convert {@link RowData} stream to {@link InternalRow} stream. */
+public class MapToInternalRow {
 
-    private final RowPartitionKeyExtractor extractor;
-
-    public RowDataPartitionKeyExtractor(TableSchema schema) {
-        this.extractor = new RowPartitionKeyExtractor(schema);
-    }
-
-    @Override
-    public BinaryRow partition(RowData record) {
-        return extractor.partition(new FlinkRowWrapper(record));
-    }
-
-    @Override
-    public BinaryRow trimmedPrimaryKey(RowData record) {
-        return extractor.trimmedPrimaryKey(new FlinkRowWrapper(record));
+    public static DataStream<InternalRow> map(DataStream<RowData> input, 
RowType rowType) {
+        return input.map((MapFunction<RowData, InternalRow>) 
FlinkRowWrapper::new)
+                .setParallelism(input.getParallelism())
+                .returns(InternalTypeInfo.fromRowType(rowType));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
index 19e1f9d5d..bf2687fdb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
@@ -19,13 +19,13 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 
-import org.apache.flink.table.data.RowData;
-
-/** {@link ChannelComputer} for {@link RowData}. */
-public class RowDataChannelComputer implements ChannelComputer<RowData> {
+/** {@link ChannelComputer} for {@link InternalRow}. */
+public class RowDataChannelComputer implements ChannelComputer<InternalRow> {
 
     private static final long serialVersionUID = 1L;
 
@@ -33,7 +33,7 @@ public class RowDataChannelComputer implements 
ChannelComputer<RowData> {
     private final boolean hasLogSink;
 
     private transient int numChannels;
-    private transient KeyAndBucketExtractor<RowData> extractor;
+    private transient KeyAndBucketExtractor<InternalRow> extractor;
 
     public RowDataChannelComputer(TableSchema schema, boolean hasLogSink) {
         this.schema = schema;
@@ -43,11 +43,11 @@ public class RowDataChannelComputer implements 
ChannelComputer<RowData> {
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
-        this.extractor = new RowDataKeyAndBucketExtractor(schema);
+        this.extractor = new FixedBucketRowKeyExtractor(schema);
     }
 
     @Override
-    public int channel(RowData record) {
+    public int channel(InternalRow record) {
         extractor.setRecord(record);
         return channel(extractor.partition(), extractor.bucket());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 81fa48aa4..a5c8e5557 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.log.LogWriteCallback;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
@@ -37,7 +37,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
-import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
@@ -45,8 +44,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 
-/** A {@link PrepareCommitOperator} to write {@link RowData}. Record schema is 
fixed. */
-public class RowDataStoreWriteOperator extends TableWriteOperator<RowData> {
+/** A {@link PrepareCommitOperator} to write {@link InternalRow}. Record 
schema is fixed. */
+public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> 
{
 
     private static final long serialVersionUID = 3L;
 
@@ -115,12 +114,12 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<RowData> {
     }
 
     @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+    public void processElement(StreamRecord<InternalRow> element) throws 
Exception {
         sinkContext.timestamp = element.hasTimestamp() ? 
element.getTimestamp() : null;
 
         SinkRecord record;
         try {
-            record = write.write(new FlinkRowWrapper(element.getValue()));
+            record = write.write(element.getValue());
         } catch (Exception e) {
             throw new IOException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
index a7775b8f9..077fec313 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -18,21 +18,22 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
 import java.util.Map;
 
 /** Sink for dynamic bucket table. */
-public class RowDynamicBucketSink extends DynamicBucketSink<RowData> {
+public class RowDynamicBucketSink extends DynamicBucketSink<InternalRow> {
 
     private static final long serialVersionUID = 1L;
 
@@ -42,23 +43,23 @@ public class RowDynamicBucketSink extends 
DynamicBucketSink<RowData> {
     }
 
     @Override
-    protected ChannelComputer<RowData> channelComputer1() {
+    protected ChannelComputer<InternalRow> channelComputer1() {
         return new RowHashKeyChannelComputer(table.schema());
     }
 
     @Override
-    protected ChannelComputer<Tuple2<RowData, Integer>> channelComputer2() {
+    protected ChannelComputer<Tuple2<InternalRow, Integer>> channelComputer2() 
{
         return new RowWithBucketChannelComputer(table.schema());
     }
 
     @Override
-    protected SerializableFunction<TableSchema, PartitionKeyExtractor<RowData>>
+    protected SerializableFunction<TableSchema, 
PartitionKeyExtractor<InternalRow>>
             extractorFunction() {
-        return RowDataPartitionKeyExtractor::new;
+        return RowPartitionKeyExtractor::new;
     }
 
     @Override
-    protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable> 
createWriteOperator(
+    protected OneInputStreamOperator<Tuple2<InternalRow, Integer>, 
Committable> createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new DynamicBucketRowWriteOperator(table, writeProvider, 
commitUser);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
index 5d43736ff..e0c131e9b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
@@ -18,19 +18,21 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 
 import org.apache.flink.table.data.RowData;
 
 /** Hash key of a {@link RowData}. */
-public class RowHashKeyChannelComputer implements ChannelComputer<RowData> {
+public class RowHashKeyChannelComputer implements ChannelComputer<InternalRow> 
{
 
     private static final long serialVersionUID = 1L;
 
     private final TableSchema schema;
 
     private transient int numChannels;
-    private transient RowDataPartitionKeyExtractor extractor;
+    private transient RowPartitionKeyExtractor extractor;
 
     public RowHashKeyChannelComputer(TableSchema schema) {
         this.schema = schema;
@@ -39,11 +41,11 @@ public class RowHashKeyChannelComputer implements 
ChannelComputer<RowData> {
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
-        this.extractor = new RowDataPartitionKeyExtractor(schema);
+        this.extractor = new RowPartitionKeyExtractor(schema);
     }
 
     @Override
-    public int channel(RowData record) {
+    public int channel(InternalRow record) {
         int hash = extractor.trimmedPrimaryKey(record).hashCode();
         return Math.abs(hash % numChannels);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
index bfec3b513..a00c85c31 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
@@ -18,20 +18,22 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.data.RowData;
 
 /** Hash key of a {@link RowData} with bucket. */
-public class RowWithBucketChannelComputer implements 
ChannelComputer<Tuple2<RowData, Integer>> {
+public class RowWithBucketChannelComputer implements 
ChannelComputer<Tuple2<InternalRow, Integer>> {
 
     private static final long serialVersionUID = 1L;
 
     private final TableSchema schema;
 
     private transient int numChannels;
-    private transient RowDataPartitionKeyExtractor extractor;
+    private transient RowPartitionKeyExtractor extractor;
 
     public RowWithBucketChannelComputer(TableSchema schema) {
         this.schema = schema;
@@ -40,11 +42,11 @@ public class RowWithBucketChannelComputer implements 
ChannelComputer<Tuple2<RowD
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
-        this.extractor = new RowDataPartitionKeyExtractor(schema);
+        this.extractor = new RowPartitionKeyExtractor(schema);
     }
 
     @Override
-    public int channel(Tuple2<RowData, Integer> record) {
+    public int channel(Tuple2<InternalRow, Integer> record) {
         return ChannelComputer.select(extractor.partition(record.f0), 
record.f1, numChannels);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
index ad10f96ba..856432582 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 
@@ -25,7 +26,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.data.RowData;
 
 import java.util.Map;
 
@@ -53,7 +53,7 @@ public class UnawareBucketWriteSink extends FileStoreSink {
     }
 
     @Override
-    public DataStreamSink<?> sinkFrom(DataStream<RowData> input, String 
initialCommitUser) {
+    public DataStreamSink<?> sinkFrom(DataStream<InternalRow> input, String 
initialCommitUser) {
         // do the actually writing action, no snapshot generated in this stage
         DataStream<Committable> written = doWrite(input, initialCommitUser, 
parallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 26cfc9b41..eb2f8d30f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -18,12 +18,13 @@
 
 package org.apache.paimon.flink.sink.index;
 
-import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator;
 import org.apache.paimon.flink.sink.FlinkWriteSink;
 import org.apache.paimon.flink.sink.RowWithBucketChannelComputer;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
 import org.apache.paimon.flink.utils.InternalTypeInfo;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
@@ -35,8 +36,6 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 
 import javax.annotation.Nullable;
 
@@ -44,12 +43,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 import static org.apache.paimon.flink.sink.index.IndexBootstrap.bootstrapType;
 
 /** Sink for global dynamic bucket table. */
-public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData, 
Integer>> {
+public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow, Integer>> {
 
     private static final long serialVersionUID = 1L;
 
@@ -59,34 +57,34 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<RowData, Inte
     }
 
     @Override
-    protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable> 
createWriteOperator(
+    protected OneInputStreamOperator<Tuple2<InternalRow, Integer>, 
Committable> createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new DynamicBucketRowWriteOperator(table, writeProvider, 
commitUser);
     }
 
-    public DataStreamSink<?> build(DataStream<RowData> input, @Nullable 
Integer parallelism) {
+    public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable 
Integer parallelism) {
         String initialCommitUser = UUID.randomUUID().toString();
 
         TableSchema schema = table.schema();
         RowType rowType = schema.logicalRowType();
         List<String> primaryKeys = schema.primaryKeys();
-        RowDataSerializer rowSerializer = new 
RowDataSerializer(toLogicalType(rowType));
+        InternalRowTypeSerializer rowSerializer = new 
InternalRowTypeSerializer(rowType);
 
         RowType bootstrapType = bootstrapType(schema);
-        RowDataSerializer bootstrapSerializer = new 
RowDataSerializer(toLogicalType(bootstrapType));
+        InternalRowTypeSerializer bootstrapSerializer =
+                new InternalRowTypeSerializer(bootstrapType);
 
         // Topology:
         // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- 
shuffle by bucket -->
         // writer --> committer
 
-        DataStream<Tuple2<KeyPartOrRow, RowData>> bootstraped =
+        DataStream<Tuple2<KeyPartOrRow, InternalRow>> bootstraped =
                 input.transform(
                                 "INDEX_BOOTSTRAP",
                                 new InternalTypeInfo<>(
                                         new KeyWithRowSerializer<>(
                                                 bootstrapSerializer, 
rowSerializer)),
-                                new IndexBootstrapOperator<>(
-                                        new IndexBootstrap(table), 
FlinkRowData::new))
+                                new IndexBootstrapOperator<>(new 
IndexBootstrap(table), r -> r))
                         .setParallelism(input.getParallelism());
 
         // 1. shuffle by key hash
@@ -97,13 +95,13 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<RowData, Inte
 
         KeyPartRowChannelComputer channelComputer =
                 new KeyPartRowChannelComputer(rowType, bootstrapType, 
primaryKeys);
-        DataStream<Tuple2<KeyPartOrRow, RowData>> partitionByKeyHash =
+        DataStream<Tuple2<KeyPartOrRow, InternalRow>> partitionByKeyHash =
                 partition(bootstraped, channelComputer, assignerParallelism);
 
         // 2. bucket-assigner
-        TupleTypeInfo<Tuple2<RowData, Integer>> rowWithBucketType =
+        TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
                 new TupleTypeInfo<>(input.getType(), 
BasicTypeInfo.INT_TYPE_INFO);
-        DataStream<Tuple2<RowData, Integer>> bucketAssigned =
+        DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
                 partitionByKeyHash
                         .transform(
                                 "dynamic-bucket-assigner",
@@ -113,7 +111,7 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<RowData, Inte
 
         // 3. shuffle by bucket
 
-        DataStream<Tuple2<RowData, Integer>> partitionByBucket =
+        DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
                 partition(bucketAssigned, new 
RowWithBucketChannelComputer(schema), parallelism);
 
         // 4. writer and committer
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index d255cdfbb..8e0e08799 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -23,14 +23,12 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
 import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SerializableFunction;
 
@@ -40,14 +38,11 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
-
 /** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
 public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2<T, Integer>>
         implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, 
Integer>>,
@@ -149,22 +144,22 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
         this.assigner.close();
     }
 
-    public static GlobalIndexAssignerOperator<RowData> forRowData(Table table) 
{
+    public static GlobalIndexAssignerOperator<InternalRow> forRowData(Table 
table) {
         return new GlobalIndexAssignerOperator<>(
-                table, createRowDataAssigner(table), FlinkRowWrapper::new, 
FlinkRowData::new);
+                table, createRowDataAssigner(table), r -> r, r -> r);
     }
 
-    public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
+    public static GlobalIndexAssigner<InternalRow> createRowDataAssigner(Table 
t) {
         RowType bootstrapType = 
IndexBootstrap.bootstrapType(((AbstractFileStoreTable) t).schema());
         int bucketIndex = bootstrapType.getFieldCount() - 1;
         return new GlobalIndexAssigner<>(
                 t,
-                RowDataPartitionKeyExtractor::new,
+                RowPartitionKeyExtractor::new,
                 KeyPartPartitionKeyExtractor::new,
                 row -> row.getInt(bucketIndex),
                 new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()),
                 (rowData, rowKind) -> {
-                    rowData.setRowKind(toFlinkRowKind(rowKind));
+                    rowData.setRowKind(rowKind);
                     return rowData;
                 });
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
index 6dc3dca42..188f3fa61 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.sink.index;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.PartitionKeyExtractor;
 import org.apache.paimon.types.RowType;
@@ -33,7 +33,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** A {@link PartitionKeyExtractor} to {@link RowData} with only key and 
partiton fields. */
-public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<RowData> {
+public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<InternalRow> {
 
     private final Projection partitionProjection;
     private final Projection keyProjection;
@@ -50,12 +50,12 @@ public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<RowDa
     }
 
     @Override
-    public BinaryRow partition(RowData record) {
-        return partitionProjection.apply(new FlinkRowWrapper(record));
+    public BinaryRow partition(InternalRow record) {
+        return partitionProjection.apply(record);
     }
 
     @Override
-    public BinaryRow trimmedPrimaryKey(RowData record) {
-        return keyProjection.apply(new FlinkRowWrapper(record));
+    public BinaryRow trimmedPrimaryKey(InternalRow record) {
+        return keyProjection.apply(record);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
index aff593007..c581c02ea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
@@ -21,17 +21,17 @@ package org.apache.paimon.flink.sink.index;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.sink.ChannelComputer;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.data.RowData;
 
 import java.util.List;
 
 /** A {@link ChannelComputer} for KeyPartOrRow and row. */
-public class KeyPartRowChannelComputer implements 
ChannelComputer<Tuple2<KeyPartOrRow, RowData>> {
+public class KeyPartRowChannelComputer
+        implements ChannelComputer<Tuple2<KeyPartOrRow, InternalRow>> {
 
     private static final long serialVersionUID = 1L;
 
@@ -58,10 +58,9 @@ public class KeyPartRowChannelComputer implements 
ChannelComputer<Tuple2<KeyPart
     }
 
     @Override
-    public int channel(Tuple2<KeyPartOrRow, RowData> record) {
+    public int channel(Tuple2<KeyPartOrRow, InternalRow> record) {
         BinaryRow key =
-                (record.f0 == KeyPartOrRow.KEY_PART ? keyPartProject : 
rowProject)
-                        .apply(new FlinkRowWrapper(record.f1));
+                (record.f0 == KeyPartOrRow.KEY_PART ? keyPartProject : 
rowProject).apply(record.f1);
         return Math.abs(key.hashCode() % numChannels);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index e357aebd3..7c38d9a8c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -21,10 +21,8 @@ package org.apache.paimon.flink.sorter;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
 import org.apache.paimon.flink.utils.InternalTypeInfo;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.Projection;
@@ -59,9 +57,7 @@ public class OrderSorter extends TableSorter {
                 origin,
                 table,
                 keyRowType,
-                new InternalTypeInfo<>(
-                        new InternalRowTypeSerializer(
-                                keyRowType.getFieldTypes().toArray(new 
DataType[0]))),
+                InternalTypeInfo.fromRowType(keyRowType),
                 new KeyComparatorSupplier(keyRowType),
                 new SortUtils.KeyAbstract<InternalRow>() {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 1cc26293f..1cc5cf8ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -25,11 +25,9 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.shuffle.RangeShuffle;
-import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
 import org.apache.paimon.flink.utils.InternalTypeInfo;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyProjectedRow;
 import org.apache.paimon.utils.SerializableSupplier;
@@ -114,9 +112,7 @@ public class SortUtils {
         fields.addAll(dataFields);
         final RowType longRowType = new RowType(fields);
         final InternalTypeInfo<InternalRow> internalRowType =
-                new InternalTypeInfo<>(
-                        new InternalRowTypeSerializer(
-                                longRowType.getFieldTypes().toArray(new 
DataType[0])));
+                InternalTypeInfo.fromRowType(longRowType);
 
         // generate the KEY as the key of Pair.
         DataStream<Tuple2<KEY, RowData>> inputWithKey =
@@ -177,9 +173,7 @@ public class SortUtils {
                                 return keyProjectedRow.replaceRow(value);
                             }
                         },
-                        new InternalTypeInfo<>(
-                                new InternalRowTypeSerializer(
-                                        
valueRowType.getFieldTypes().toArray(new DataType[0]))))
+                        InternalTypeInfo.fromRowType(valueRowType))
                 .setParallelism(sinkParallelism)
                 .map(FlinkRowData::new, inputStream.getType())
                 .setParallelism(sinkParallelism);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
index 1cf197386..b0b0a70c1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -36,6 +37,10 @@ public class InternalRowTypeSerializer extends 
InternalTypeSerializer<InternalRo
 
     private final InternalRowSerializer internalRowSerializer;
 
+    public InternalRowTypeSerializer(RowType rowType) {
+        this(rowType.getFieldTypes().toArray(new DataType[0]));
+    }
+
     public InternalRowTypeSerializer(DataType... types) {
         internalRowSerializer = new InternalRowSerializer(types);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
index f0b0d0863..4ea5db9f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.flink.utils;
 
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -35,6 +38,10 @@ public class InternalTypeInfo<T> extends TypeInformation<T> {
         this.serializer = serializer;
     }
 
+    public static InternalTypeInfo<InternalRow> fromRowType(RowType rowType) {
+        return new InternalTypeInfo<>(new InternalRowTypeSerializer(rowType));
+    }
+
     @Override
     public boolean isBasicType() {
         return false;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
index c70d9633c..f18116679 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
@@ -19,39 +19,34 @@
 package org.apache.paimon.flink.utils;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SerBiFunction;
 
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.LogicalType;
-
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.table.data.RowData.createFieldGetter;
+import static org.apache.paimon.data.InternalRow.createFieldGetter;
 
-/** Project {@link BinaryRow} fields into {@link RowData}. */
-public class ProjectToRowDataFunction implements SerBiFunction<RowData, 
BinaryRow, RowData> {
+/** Project {@link BinaryRow} fields into {@link InternalRow}. */
+public class ProjectToRowDataFunction
+        implements SerBiFunction<InternalRow, BinaryRow, InternalRow> {
 
-    private final RowData.FieldGetter[] fieldGetters;
+    private final InternalRow.FieldGetter[] fieldGetters;
 
     private final Map<Integer, Integer> projectMapping;
-    private final RowData.FieldGetter[] projectGetters;
+    private final InternalRow.FieldGetter[] projectGetters;
 
     public ProjectToRowDataFunction(RowType rowType, List<String> 
projectFields) {
-        LogicalType[] types =
-                LogicalTypeConversion.toLogicalType(rowType)
-                        .getChildren()
-                        .toArray(new LogicalType[0]);
+        DataType[] types = rowType.getFieldTypes().toArray(new DataType[0]);
         this.fieldGetters =
                 IntStream.range(0, types.length)
                         .mapToObj(i -> createFieldGetter(types[i], i))
-                        .toArray(RowData.FieldGetter[]::new);
+                        .toArray(InternalRow.FieldGetter[]::new);
 
         List<String> fieldNames = rowType.getFieldNames();
         this.projectMapping =
@@ -64,17 +59,16 @@ public class ProjectToRowDataFunction implements 
SerBiFunction<RowData, BinaryRo
                                         createFieldGetter(
                                                 
types[rowType.getFieldIndex(field)],
                                                 projectFields.indexOf(field)))
-                        .toArray(RowData.FieldGetter[]::new);
+                        .toArray(InternalRow.FieldGetter[]::new);
     }
 
     @Override
-    public RowData apply(RowData input, BinaryRow project) {
-        GenericRowData newRow = new GenericRowData(fieldGetters.length);
-        FlinkRowData partRow = new FlinkRowData(project);
+    public InternalRow apply(InternalRow input, BinaryRow project) {
+        GenericRow newRow = new GenericRow(fieldGetters.length);
         for (int i = 0; i < fieldGetters.length; i++) {
             Object field =
                     projectMapping.containsKey(i)
-                            ? 
projectGetters[projectMapping.get(i)].getFieldOrNull(partRow)
+                            ? 
projectGetters[projectMapping.get(i)].getFieldOrNull(project)
                             : fieldGetters[i].getFieldOrNull(input);
             newRow.setField(i, field);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 81d7b5d5f..a93a361dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.operation.KeyValueFileStoreWrite;
@@ -48,7 +48,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -82,10 +81,10 @@ public class FlinkSinkTest {
     private boolean testSpillable(
             StreamExecutionEnvironment streamExecutionEnvironment, 
FileStoreTable fileStoreTable)
             throws Exception {
-        DataStreamSource<RowData> source =
+        DataStreamSource<InternalRow> source =
                 streamExecutionEnvironment.fromCollection(
-                        Collections.singletonList(new 
FlinkRowData(GenericRow.of(1, 1))));
-        FlinkSink<RowData> flinkSink = new FileStoreSink(fileStoreTable, null, 
null);
+                        Collections.singletonList(GenericRow.of(1, 1)));
+        FlinkSink<InternalRow> flinkSink = new FileStoreSink(fileStoreTable, 
null, null);
         SingleOutputStreamOperator<Committable> written = 
flinkSink.doWrite(source, "123", 1);
         RowDataStoreWriteOperator operator =
                 ((RowDataStoreWriteOperator)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
index 710f8bece..f727cf981 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
@@ -19,17 +19,18 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -70,11 +71,10 @@ public class RowDataChannelComputerTest {
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int numInputs = random.nextInt(1000) + 1;
-        List<RowData> input = new ArrayList<>();
+        List<InternalRow> input = new ArrayList<>();
         for (int i = 0; i < numInputs; i++) {
             input.add(
-                    GenericRowData.of(
-                            random.nextInt(10) + 1, random.nextLong(), 
random.nextDouble()));
+                    GenericRow.of(random.nextInt(10) + 1, random.nextLong(), 
random.nextDouble()));
         }
 
         testImpl(schema, input);
@@ -100,17 +100,17 @@ public class RowDataChannelComputerTest {
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int numInputs = random.nextInt(1000) + 1;
-        List<RowData> input = new ArrayList<>();
+        List<InternalRow> input = new ArrayList<>();
         for (int i = 0; i < numInputs; i++) {
-            input.add(GenericRowData.of(random.nextLong(), 
random.nextDouble()));
+            input.add(GenericRow.of(random.nextLong(), random.nextDouble()));
         }
 
         testImpl(schema, input);
     }
 
-    private void testImpl(TableSchema schema, List<RowData> input) {
+    private void testImpl(TableSchema schema, List<InternalRow> input) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        RowDataKeyAndBucketExtractor extractor = new 
RowDataKeyAndBucketExtractor(schema);
+        FixedBucketRowKeyExtractor extractor = new 
FixedBucketRowKeyExtractor(schema);
 
         int numChannels = random.nextInt(10) + 1;
         boolean hasLogSink = random.nextBoolean();
@@ -119,7 +119,7 @@ public class RowDataChannelComputerTest {
 
         // assert that channel(record) and channel(partition, bucket) gives 
the same result
 
-        for (RowData rowData : input) {
+        for (InternalRow rowData : input) {
             extractor.setRecord(rowData);
             BinaryRow partition = extractor.partition();
             int bucket = extractor.bucket();
@@ -156,7 +156,7 @@ public class RowDataChannelComputerTest {
 
         if (hasLogSink) {
             Map<Integer, Set<Integer>> channelsPerBucket = new HashMap<>();
-            for (RowData rowData : input) {
+            for (InternalRow rowData : input) {
                 extractor.setRecord(rowData);
                 int bucket = extractor.bucket();
                 channelsPerBucket
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
index 67d54f61a..97503079b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -21,15 +21,15 @@ package org.apache.paimon.flink.sink.index;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -44,12 +44,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test for {@link GlobalIndexAssigner}. */
 public class GlobalIndexAssignerTest extends TableTestBase {
 
-    private GlobalIndexAssigner<RowData> createAssigner(MergeEngine 
mergeEngine) throws Exception {
+    private GlobalIndexAssigner<InternalRow> createAssigner(MergeEngine 
mergeEngine)
+            throws Exception {
         return createAssigner(mergeEngine, false);
     }
 
-    private GlobalIndexAssigner<RowData> createAssigner(MergeEngine 
mergeEngine, boolean enableTtl)
-            throws Exception {
+    private GlobalIndexAssigner<InternalRow> createAssigner(
+            MergeEngine mergeEngine, boolean enableTtl) throws Exception {
         Identifier identifier = identifier("T");
         Options options = new Options();
         options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
@@ -85,31 +86,32 @@ public class GlobalIndexAssignerTest extends TableTestBase {
     }
 
     private void innerTestBucketAssign(boolean enableTtl) throws Exception {
-        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
+        GlobalIndexAssigner<InternalRow> assigner =
+                createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
         List<Integer> output = new ArrayList<>();
         assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) -> 
output.add(bucket));
 
         // assign
-        assigner.process(GenericRowData.of(1, 1, 1));
-        assigner.process(GenericRowData.of(1, 2, 2));
-        assigner.process(GenericRowData.of(1, 3, 3));
+        assigner.process(GenericRow.of(1, 1, 1));
+        assigner.process(GenericRow.of(1, 2, 2));
+        assigner.process(GenericRow.of(1, 3, 3));
         assertThat(output).containsExactly(0, 0, 0);
         output.clear();
 
         // full
-        assigner.process(GenericRowData.of(1, 4, 4));
+        assigner.process(GenericRow.of(1, 4, 4));
         assertThat(output).containsExactly(2);
         output.clear();
 
         // another partition
-        assigner.process(GenericRowData.of(2, 5, 5));
+        assigner.process(GenericRow.of(2, 5, 5));
         assertThat(output).containsExactly(0);
         output.clear();
 
         // read assigned
-        assigner.process(GenericRowData.of(1, 4, 4));
-        assigner.process(GenericRowData.of(1, 2, 2));
-        assigner.process(GenericRowData.of(1, 3, 3));
+        assigner.process(GenericRow.of(1, 4, 4));
+        assigner.process(GenericRow.of(1, 2, 2));
+        assigner.process(GenericRow.of(1, 3, 3));
         assertThat(output).containsExactly(2, 0, 0);
         output.clear();
 
@@ -118,8 +120,8 @@ public class GlobalIndexAssignerTest extends TableTestBase {
 
     @Test
     public void testUpsert() throws Exception {
-        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.DEDUPLICATE);
-        List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+        GlobalIndexAssigner<InternalRow> assigner = 
createAssigner(MergeEngine.DEDUPLICATE);
+        List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
         assigner.open(
                 new File(warehouse.getPath()),
                 2,
@@ -127,32 +129,32 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
                 (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
 
         // change partition
-        assigner.process(GenericRowData.of(1, 1, 1));
-        assigner.process(GenericRowData.of(2, 1, 2));
+        assigner.process(GenericRow.of(1, 1, 1));
+        assigner.process(GenericRow.of(2, 1, 2));
         assertThat(output)
                 .containsExactly(
-                        new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
-                        new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1, 
1, 2), 0),
-                        new Tuple2<>(GenericRowData.of(2, 1, 2), 0));
+                        new Tuple2<>(GenericRow.of(1, 1, 1), 0),
+                        new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 1, 
2), 0),
+                        new Tuple2<>(GenericRow.of(2, 1, 2), 0));
         output.clear();
 
         // test partition 1 deleted
-        assigner.process(GenericRowData.of(1, 2, 2));
-        assigner.process(GenericRowData.of(1, 3, 3));
-        assigner.process(GenericRowData.of(1, 4, 4));
+        assigner.process(GenericRow.of(1, 2, 2));
+        assigner.process(GenericRow.of(1, 3, 3));
+        assigner.process(GenericRow.of(1, 4, 4));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
         output.clear();
 
         // move from full bucket
-        assigner.process(GenericRowData.of(2, 4, 4));
+        assigner.process(GenericRow.of(2, 4, 4));
         assertThat(output)
                 .containsExactly(
-                        new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1, 
4, 4), 0),
-                        new Tuple2<>(GenericRowData.of(2, 4, 4), 0));
+                        new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 4, 
4), 0),
+                        new Tuple2<>(GenericRow.of(2, 4, 4), 0));
         output.clear();
 
         // test partition 1 deleted
-        assigner.process(GenericRowData.of(1, 5, 5));
+        assigner.process(GenericRow.of(1, 5, 5));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0);
         output.clear();
 
@@ -165,8 +167,8 @@ public class GlobalIndexAssignerTest extends TableTestBase {
                 ThreadLocalRandom.current().nextBoolean()
                         ? MergeEngine.PARTIAL_UPDATE
                         : MergeEngine.AGGREGATE;
-        GlobalIndexAssigner<RowData> assigner = createAssigner(mergeEngine);
-        List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+        GlobalIndexAssigner<InternalRow> assigner = 
createAssigner(mergeEngine);
+        List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
         assigner.open(
                 new File(warehouse.getPath()),
                 2,
@@ -174,18 +176,18 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
                 (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
 
         // change partition
-        assigner.process(GenericRowData.of(1, 1, 1));
-        assigner.process(GenericRowData.of(2, 1, 2));
+        assigner.process(GenericRow.of(1, 1, 1));
+        assigner.process(GenericRow.of(2, 1, 2));
         assertThat(output)
                 .containsExactly(
-                        new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
-                        new Tuple2<>(GenericRowData.of(1, 1, 2), 0));
+                        new Tuple2<>(GenericRow.of(1, 1, 1), 0),
+                        new Tuple2<>(GenericRow.of(1, 1, 2), 0));
         output.clear();
 
         // test partition 2 no effect
-        assigner.process(GenericRowData.of(2, 2, 2));
-        assigner.process(GenericRowData.of(2, 3, 3));
-        assigner.process(GenericRowData.of(2, 4, 4));
+        assigner.process(GenericRow.of(2, 2, 2));
+        assigner.process(GenericRow.of(2, 3, 3));
+        assigner.process(GenericRow.of(2, 4, 4));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
         output.clear();
         assigner.close();
@@ -193,8 +195,8 @@ public class GlobalIndexAssignerTest extends TableTestBase {
 
     @Test
     public void testFirstRow() throws Exception {
-        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.FIRST_ROW);
-        List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+        GlobalIndexAssigner<InternalRow> assigner = 
createAssigner(MergeEngine.FIRST_ROW);
+        List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
         assigner.open(
                 new File(warehouse.getPath()),
                 2,
@@ -202,15 +204,15 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
                 (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
 
         // change partition
-        assigner.process(GenericRowData.of(1, 1, 1));
-        assigner.process(GenericRowData.of(2, 1, 2));
-        assertThat(output).containsExactly(new Tuple2<>(GenericRowData.of(1, 
1, 1), 0));
+        assigner.process(GenericRow.of(1, 1, 1));
+        assigner.process(GenericRow.of(2, 1, 2));
+        assertThat(output).containsExactly(new Tuple2<>(GenericRow.of(1, 1, 
1), 0));
         output.clear();
 
         // test partition 2 no effect
-        assigner.process(GenericRowData.of(2, 2, 2));
-        assigner.process(GenericRowData.of(2, 3, 3));
-        assigner.process(GenericRowData.of(2, 4, 4));
+        assigner.process(GenericRow.of(2, 2, 2));
+        assigner.process(GenericRow.of(2, 3, 3));
+        assigner.process(GenericRow.of(2, 4, 4));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
         output.clear();
         assigner.close();

Reply via email to