This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 79ca53bff [flink] Use InternalRow instead of RowData in sink sub topo
(#2004)
79ca53bff is described below
commit 79ca53bffb8b65025f87c1246a8af603fac5256a
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 15:49:41 2023 +0800
[flink] Use InternalRow instead of RowData in sink sub topo (#2004)
---
.../flink/sink/DynamicBucketRowWriteOperator.java | 16 ++--
.../apache/paimon/flink/sink/FileStoreSink.java | 6 +-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 +--
.../paimon/flink/sink/LocalMergeOperator.java | 17 ++--
...tionKeyExtractor.java => MapToInternalRow.java} | 31 +++----
.../paimon/flink/sink/RowDataChannelComputer.java | 14 ++--
.../flink/sink/RowDataStoreWriteOperator.java | 11 ++-
.../paimon/flink/sink/RowDynamicBucketSink.java | 15 ++--
.../flink/sink/RowHashKeyChannelComputer.java | 10 ++-
.../flink/sink/RowWithBucketChannelComputer.java | 10 ++-
.../paimon/flink/sink/UnawareBucketWriteSink.java | 4 +-
.../flink/sink/index/GlobalDynamicBucketSink.java | 30 ++++---
.../sink/index/GlobalIndexAssignerOperator.java | 17 ++--
.../sink/index/KeyPartPartitionKeyExtractor.java | 12 +--
.../sink/index/KeyPartRowChannelComputer.java | 11 ++-
.../apache/paimon/flink/sorter/OrderSorter.java | 6 +-
.../org/apache/paimon/flink/sorter/SortUtils.java | 10 +--
.../flink/utils/InternalRowTypeSerializer.java | 5 ++
.../paimon/flink/utils/InternalTypeInfo.java | 7 ++
.../flink/utils/ProjectToRowDataFunction.java | 36 ++++-----
.../apache/paimon/flink/sink/FlinkSinkTest.java | 9 +--
.../flink/sink/RowDataChannelComputerTest.java | 22 ++---
.../flink/sink/index/GlobalIndexAssignerTest.java | 94 +++++++++++-----------
23 files changed, 193 insertions(+), 211 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
index 7db9c2802..2a2d1fe20 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
@@ -18,16 +18,18 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
-/** A {@link PrepareCommitOperator} to write {@link RowData} with bucket.
Record schema is fixed. */
-public class DynamicBucketRowWriteOperator extends
TableWriteOperator<Tuple2<RowData, Integer>> {
+/**
+ * A {@link PrepareCommitOperator} to write {@link InternalRow} with bucket.
Record schema is fixed.
+ */
+public class DynamicBucketRowWriteOperator
+ extends TableWriteOperator<Tuple2<InternalRow, Integer>> {
private static final long serialVersionUID = 1L;
@@ -44,9 +46,9 @@ public class DynamicBucketRowWriteOperator extends
TableWriteOperator<Tuple2<Row
}
@Override
- public void processElement(StreamRecord<Tuple2<RowData, Integer>> element)
throws Exception {
- FlinkRowWrapper internalRow = new
FlinkRowWrapper(element.getValue().f0);
- DynamicBucketRow row = new DynamicBucketRow(internalRow,
element.getValue().f1);
+ public void processElement(StreamRecord<Tuple2<InternalRow, Integer>>
element)
+ throws Exception {
+ DynamicBucketRow row = new DynamicBucketRow(element.getValue().f0,
element.getValue().f1);
write.write(row);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 6b40815ed..5481ae1f5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -18,17 +18,17 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
import java.util.Map;
/** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkWriteSink<RowData> {
+public class FileStoreSink extends FlinkWriteSink<InternalRow> {
private static final long serialVersionUID = 1L;
@@ -43,7 +43,7 @@ public class FileStoreSink extends FlinkWriteSink<RowData> {
}
@Override
- protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+ protected OneInputStreamOperator<InternalRow, Committable>
createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new RowDataStoreWriteOperator(table, logSinkFunction,
writeProvider, commitUser);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 4102d5ca1..cd9edb252 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
@@ -69,7 +70,7 @@ public class FlinkSinkBuilder {
}
public DataStreamSink<?> build() {
- DataStream<RowData> input = this.input;
+ DataStream<InternalRow> input = MapToInternalRow.map(this.input,
table.rowType());
if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
input =
input.forward()
@@ -96,15 +97,15 @@ public class FlinkSinkBuilder {
}
private DataStreamSink<?> buildDynamicBucketSink(
- DataStream<RowData> input, boolean globalIndex) {
+ DataStream<InternalRow> input, boolean globalIndex) {
checkArgument(logSinkFunction == null, "Dynamic bucket mode can not
work with log system.");
return globalIndex
? new GlobalDynamicBucketSink(table,
overwritePartition).build(input, parallelism)
: new RowDynamicBucketSink(table,
overwritePartition).build(input, parallelism);
}
- private DataStreamSink<?> buildForFixedBucket(DataStream<RowData> input) {
- DataStream<RowData> partitioned =
+ private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow>
input) {
+ DataStream<InternalRow> partitioned =
partition(
input,
new RowDataChannelComputer(table.schema(),
logSinkFunction != null),
@@ -113,7 +114,7 @@ public class FlinkSinkBuilder {
return sink.sinkFrom(partitioned);
}
- private DataStreamSink<?> buildUnawareBucketSink(DataStream<RowData>
input) {
+ private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow>
input) {
checkArgument(
table instanceof AppendOnlyFileStoreTable,
"Unaware bucket mode only works with append-only table for
now.");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 52581a1b5..aab614f03 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -24,8 +24,6 @@ import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
@@ -43,14 +41,13 @@ import
org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
/**
* {@link AbstractStreamOperator} which buffer input record and apply merge
function when the buffer
* is full. Mainly to resolve data skew on primary keys.
*/
-public class LocalMergeOperator extends AbstractStreamOperator<RowData>
- implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+public class LocalMergeOperator extends AbstractStreamOperator<InternalRow>
+ implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
private static final long serialVersionUID = 1L;
@@ -66,7 +63,6 @@ public class LocalMergeOperator extends
AbstractStreamOperator<RowData>
private transient SortBufferWriteBuffer buffer;
private transient long currentWatermark;
- private transient FlinkRowData reusedRowData;
private transient boolean endOfInput;
public LocalMergeOperator(TableSchema schema) {
@@ -106,14 +102,13 @@ public class LocalMergeOperator extends
AbstractStreamOperator<RowData>
null);
currentWatermark = Long.MIN_VALUE;
- reusedRowData = new FlinkRowData(null);
endOfInput = false;
}
@Override
- public void processElement(StreamRecord<RowData> record) throws Exception {
+ public void processElement(StreamRecord<InternalRow> record) throws
Exception {
recordCount++;
- InternalRow row = new FlinkRowWrapper(record.getValue());
+ InternalRow row = record.getValue();
RowKind rowKind = row.getRowKind();
// row kind must be INSERT when it is divided into key and value
@@ -133,7 +128,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<RowData>
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
+ public void processWatermark(Watermark mark) {
// don't emit watermark immediately, emit them after flushing buffer
currentWatermark = mark.getTimestamp();
}
@@ -173,7 +168,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<RowData>
kv -> {
InternalRow row = kv.value();
row.setRowKind(kv.valueKind());
- output.collect(new
StreamRecord<>(reusedRowData.replace(row)));
+ output.collect(new StreamRecord<>(row));
});
buffer.clear();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataPartitionKeyExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
similarity index 53%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataPartitionKeyExtractor.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
index 9db37c7cd..a9bf74401 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataPartitionKeyExtractor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
@@ -18,30 +18,21 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.types.RowType;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
-/** A {@link PartitionKeyExtractor} to {@link RowData}. */
-public class RowDataPartitionKeyExtractor implements
PartitionKeyExtractor<RowData> {
+/** An util to convert {@link RowData} stream to {@link InternalRow} stream. */
+public class MapToInternalRow {
- private final RowPartitionKeyExtractor extractor;
-
- public RowDataPartitionKeyExtractor(TableSchema schema) {
- this.extractor = new RowPartitionKeyExtractor(schema);
- }
-
- @Override
- public BinaryRow partition(RowData record) {
- return extractor.partition(new FlinkRowWrapper(record));
- }
-
- @Override
- public BinaryRow trimmedPrimaryKey(RowData record) {
- return extractor.trimmedPrimaryKey(new FlinkRowWrapper(record));
+ public static DataStream<InternalRow> map(DataStream<RowData> input,
RowType rowType) {
+ return input.map((MapFunction<RowData, InternalRow>)
FlinkRowWrapper::new)
+ .setParallelism(input.getParallelism())
+ .returns(InternalTypeInfo.fromRowType(rowType));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
index 19e1f9d5d..bf2687fdb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
@@ -19,13 +19,13 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-import org.apache.flink.table.data.RowData;
-
-/** {@link ChannelComputer} for {@link RowData}. */
-public class RowDataChannelComputer implements ChannelComputer<RowData> {
+/** {@link ChannelComputer} for {@link InternalRow}. */
+public class RowDataChannelComputer implements ChannelComputer<InternalRow> {
private static final long serialVersionUID = 1L;
@@ -33,7 +33,7 @@ public class RowDataChannelComputer implements
ChannelComputer<RowData> {
private final boolean hasLogSink;
private transient int numChannels;
- private transient KeyAndBucketExtractor<RowData> extractor;
+ private transient KeyAndBucketExtractor<InternalRow> extractor;
public RowDataChannelComputer(TableSchema schema, boolean hasLogSink) {
this.schema = schema;
@@ -43,11 +43,11 @@ public class RowDataChannelComputer implements
ChannelComputer<RowData> {
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.extractor = new RowDataKeyAndBucketExtractor(schema);
+ this.extractor = new FixedBucketRowKeyExtractor(schema);
}
@Override
- public int channel(RowData record) {
+ public int channel(InternalRow record) {
extractor.setRecord(record);
return channel(extractor.partition(), extractor.bucket());
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 81fa48aa4..a5c8e5557 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
@@ -37,7 +37,6 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
-import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
@@ -45,8 +44,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
-/** A {@link PrepareCommitOperator} to write {@link RowData}. Record schema is
fixed. */
-public class RowDataStoreWriteOperator extends TableWriteOperator<RowData> {
+/** A {@link PrepareCommitOperator} to write {@link InternalRow}. Record
schema is fixed. */
+public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow>
{
private static final long serialVersionUID = 3L;
@@ -115,12 +114,12 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<RowData> {
}
@Override
- public void processElement(StreamRecord<RowData> element) throws Exception
{
+ public void processElement(StreamRecord<InternalRow> element) throws
Exception {
sinkContext.timestamp = element.hasTimestamp() ?
element.getTimestamp() : null;
SinkRecord record;
try {
- record = write.write(new FlinkRowWrapper(element.getValue()));
+ record = write.write(element.getValue());
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
index a7775b8f9..077fec313 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -18,21 +18,22 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
import java.util.Map;
/** Sink for dynamic bucket table. */
-public class RowDynamicBucketSink extends DynamicBucketSink<RowData> {
+public class RowDynamicBucketSink extends DynamicBucketSink<InternalRow> {
private static final long serialVersionUID = 1L;
@@ -42,23 +43,23 @@ public class RowDynamicBucketSink extends
DynamicBucketSink<RowData> {
}
@Override
- protected ChannelComputer<RowData> channelComputer1() {
+ protected ChannelComputer<InternalRow> channelComputer1() {
return new RowHashKeyChannelComputer(table.schema());
}
@Override
- protected ChannelComputer<Tuple2<RowData, Integer>> channelComputer2() {
+ protected ChannelComputer<Tuple2<InternalRow, Integer>> channelComputer2()
{
return new RowWithBucketChannelComputer(table.schema());
}
@Override
- protected SerializableFunction<TableSchema, PartitionKeyExtractor<RowData>>
+ protected SerializableFunction<TableSchema,
PartitionKeyExtractor<InternalRow>>
extractorFunction() {
- return RowDataPartitionKeyExtractor::new;
+ return RowPartitionKeyExtractor::new;
}
@Override
- protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable>
createWriteOperator(
+ protected OneInputStreamOperator<Tuple2<InternalRow, Integer>,
Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new DynamicBucketRowWriteOperator(table, writeProvider,
commitUser);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
index 5d43736ff..e0c131e9b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowHashKeyChannelComputer.java
@@ -18,19 +18,21 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.flink.table.data.RowData;
/** Hash key of a {@link RowData}. */
-public class RowHashKeyChannelComputer implements ChannelComputer<RowData> {
+public class RowHashKeyChannelComputer implements ChannelComputer<InternalRow>
{
private static final long serialVersionUID = 1L;
private final TableSchema schema;
private transient int numChannels;
- private transient RowDataPartitionKeyExtractor extractor;
+ private transient RowPartitionKeyExtractor extractor;
public RowHashKeyChannelComputer(TableSchema schema) {
this.schema = schema;
@@ -39,11 +41,11 @@ public class RowHashKeyChannelComputer implements
ChannelComputer<RowData> {
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.extractor = new RowDataPartitionKeyExtractor(schema);
+ this.extractor = new RowPartitionKeyExtractor(schema);
}
@Override
- public int channel(RowData record) {
+ public int channel(InternalRow record) {
int hash = extractor.trimmedPrimaryKey(record).hashCode();
return Math.abs(hash % numChannels);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
index bfec3b513..a00c85c31 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowWithBucketChannelComputer.java
@@ -18,20 +18,22 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
/** Hash key of a {@link RowData} with bucket. */
-public class RowWithBucketChannelComputer implements
ChannelComputer<Tuple2<RowData, Integer>> {
+public class RowWithBucketChannelComputer implements
ChannelComputer<Tuple2<InternalRow, Integer>> {
private static final long serialVersionUID = 1L;
private final TableSchema schema;
private transient int numChannels;
- private transient RowDataPartitionKeyExtractor extractor;
+ private transient RowPartitionKeyExtractor extractor;
public RowWithBucketChannelComputer(TableSchema schema) {
this.schema = schema;
@@ -40,11 +42,11 @@ public class RowWithBucketChannelComputer implements
ChannelComputer<Tuple2<RowD
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.extractor = new RowDataPartitionKeyExtractor(schema);
+ this.extractor = new RowPartitionKeyExtractor(schema);
}
@Override
- public int channel(Tuple2<RowData, Integer> record) {
+ public int channel(Tuple2<InternalRow, Integer> record) {
return ChannelComputer.select(extractor.partition(record.f0),
record.f1, numChannels);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
index ad10f96ba..856432582 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
@@ -25,7 +26,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.data.RowData;
import java.util.Map;
@@ -53,7 +53,7 @@ public class UnawareBucketWriteSink extends FileStoreSink {
}
@Override
- public DataStreamSink<?> sinkFrom(DataStream<RowData> input, String
initialCommitUser) {
+ public DataStreamSink<?> sinkFrom(DataStream<InternalRow> input, String
initialCommitUser) {
// do the actually writing action, no snapshot generated in this stage
DataStream<Committable> written = doWrite(input, initialCommitUser,
parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 26cfc9b41..eb2f8d30f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -18,12 +18,13 @@
package org.apache.paimon.flink.sink.index;
-import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.RowWithBucketChannelComputer;
import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
@@ -35,8 +36,6 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import javax.annotation.Nullable;
@@ -44,12 +43,11 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.sink.index.IndexBootstrap.bootstrapType;
/** Sink for global dynamic bucket table. */
-public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData,
Integer>> {
+public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<InternalRow, Integer>> {
private static final long serialVersionUID = 1L;
@@ -59,34 +57,34 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<RowData, Inte
}
@Override
- protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable>
createWriteOperator(
+ protected OneInputStreamOperator<Tuple2<InternalRow, Integer>,
Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new DynamicBucketRowWriteOperator(table, writeProvider,
commitUser);
}
- public DataStreamSink<?> build(DataStream<RowData> input, @Nullable
Integer parallelism) {
+ public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable
Integer parallelism) {
String initialCommitUser = UUID.randomUUID().toString();
TableSchema schema = table.schema();
RowType rowType = schema.logicalRowType();
List<String> primaryKeys = schema.primaryKeys();
- RowDataSerializer rowSerializer = new
RowDataSerializer(toLogicalType(rowType));
+ InternalRowTypeSerializer rowSerializer = new
InternalRowTypeSerializer(rowType);
RowType bootstrapType = bootstrapType(schema);
- RowDataSerializer bootstrapSerializer = new
RowDataSerializer(toLogicalType(bootstrapType));
+ InternalRowTypeSerializer bootstrapSerializer =
+ new InternalRowTypeSerializer(bootstrapType);
// Topology:
// input -- bootstrap -- shuffle by key hash --> bucket-assigner --
shuffle by bucket -->
// writer --> committer
- DataStream<Tuple2<KeyPartOrRow, RowData>> bootstraped =
+ DataStream<Tuple2<KeyPartOrRow, InternalRow>> bootstraped =
input.transform(
"INDEX_BOOTSTRAP",
new InternalTypeInfo<>(
new KeyWithRowSerializer<>(
bootstrapSerializer,
rowSerializer)),
- new IndexBootstrapOperator<>(
- new IndexBootstrap(table),
FlinkRowData::new))
+ new IndexBootstrapOperator<>(new
IndexBootstrap(table), r -> r))
.setParallelism(input.getParallelism());
// 1. shuffle by key hash
@@ -97,13 +95,13 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<RowData, Inte
KeyPartRowChannelComputer channelComputer =
new KeyPartRowChannelComputer(rowType, bootstrapType,
primaryKeys);
- DataStream<Tuple2<KeyPartOrRow, RowData>> partitionByKeyHash =
+ DataStream<Tuple2<KeyPartOrRow, InternalRow>> partitionByKeyHash =
partition(bootstraped, channelComputer, assignerParallelism);
// 2. bucket-assigner
- TupleTypeInfo<Tuple2<RowData, Integer>> rowWithBucketType =
+ TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
new TupleTypeInfo<>(input.getType(),
BasicTypeInfo.INT_TYPE_INFO);
- DataStream<Tuple2<RowData, Integer>> bucketAssigned =
+ DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
partitionByKeyHash
.transform(
"dynamic-bucket-assigner",
@@ -113,7 +111,7 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<RowData, Inte
// 3. shuffle by bucket
- DataStream<Tuple2<RowData, Integer>> partitionByBucket =
+ DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
partition(bucketAssigned, new
RowWithBucketChannelComputer(schema), parallelism);
// 4. writer and committer
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index d255cdfbb..8e0e08799 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -23,14 +23,12 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializableFunction;
@@ -40,14 +38,11 @@ import
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
-import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
-
/** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple2<T, Integer>>
implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T,
Integer>>,
@@ -149,22 +144,22 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
this.assigner.close();
}
- public static GlobalIndexAssignerOperator<RowData> forRowData(Table table)
{
+ public static GlobalIndexAssignerOperator<InternalRow> forRowData(Table
table) {
return new GlobalIndexAssignerOperator<>(
- table, createRowDataAssigner(table), FlinkRowWrapper::new,
FlinkRowData::new);
+ table, createRowDataAssigner(table), r -> r, r -> r);
}
- public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
+ public static GlobalIndexAssigner<InternalRow> createRowDataAssigner(Table
t) {
RowType bootstrapType =
IndexBootstrap.bootstrapType(((AbstractFileStoreTable) t).schema());
int bucketIndex = bootstrapType.getFieldCount() - 1;
return new GlobalIndexAssigner<>(
t,
- RowDataPartitionKeyExtractor::new,
+ RowPartitionKeyExtractor::new,
KeyPartPartitionKeyExtractor::new,
row -> row.getInt(bucketIndex),
new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()),
(rowData, rowKind) -> {
- rowData.setRowKind(toFlinkRowKind(rowKind));
+ rowData.setRowKind(rowKind);
return rowData;
});
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
index 6dc3dca42..188f3fa61 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.sink.index;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.types.RowType;
@@ -33,7 +33,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/** A {@link PartitionKeyExtractor} to {@link RowData} with only key and
partiton fields. */
-public class KeyPartPartitionKeyExtractor implements
PartitionKeyExtractor<RowData> {
+public class KeyPartPartitionKeyExtractor implements
PartitionKeyExtractor<InternalRow> {
private final Projection partitionProjection;
private final Projection keyProjection;
@@ -50,12 +50,12 @@ public class KeyPartPartitionKeyExtractor implements
PartitionKeyExtractor<RowDa
}
@Override
- public BinaryRow partition(RowData record) {
- return partitionProjection.apply(new FlinkRowWrapper(record));
+ public BinaryRow partition(InternalRow record) {
+ return partitionProjection.apply(record);
}
@Override
- public BinaryRow trimmedPrimaryKey(RowData record) {
- return keyProjection.apply(new FlinkRowWrapper(record));
+ public BinaryRow trimmedPrimaryKey(InternalRow record) {
+ return keyProjection.apply(record);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
index aff593007..c581c02ea 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
@@ -21,17 +21,17 @@ package org.apache.paimon.flink.sink.index;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.ChannelComputer;
import org.apache.paimon.types.RowType;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.data.RowData;
import java.util.List;
/** A {@link ChannelComputer} for KeyPartOrRow and row. */
-public class KeyPartRowChannelComputer implements
ChannelComputer<Tuple2<KeyPartOrRow, RowData>> {
+public class KeyPartRowChannelComputer
+ implements ChannelComputer<Tuple2<KeyPartOrRow, InternalRow>> {
private static final long serialVersionUID = 1L;
@@ -58,10 +58,9 @@ public class KeyPartRowChannelComputer implements
ChannelComputer<Tuple2<KeyPart
}
@Override
- public int channel(Tuple2<KeyPartOrRow, RowData> record) {
+ public int channel(Tuple2<KeyPartOrRow, InternalRow> record) {
BinaryRow key =
- (record.f0 == KeyPartOrRow.KEY_PART ? keyPartProject :
rowProject)
- .apply(new FlinkRowWrapper(record.f1));
+ (record.f0 == KeyPartOrRow.KEY_PART ? keyPartProject :
rowProject).apply(record.f1);
return Math.abs(key.hashCode() % numChannels);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index e357aebd3..7c38d9a8c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -21,10 +21,8 @@ package org.apache.paimon.flink.sorter;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Projection;
@@ -59,9 +57,7 @@ public class OrderSorter extends TableSorter {
origin,
table,
keyRowType,
- new InternalTypeInfo<>(
- new InternalRowTypeSerializer(
- keyRowType.getFieldTypes().toArray(new
DataType[0]))),
+ InternalTypeInfo.fromRowType(keyRowType),
new KeyComparatorSupplier(keyRowType),
new SortUtils.KeyAbstract<InternalRow>() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 1cc26293f..1cc5cf8ca 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -25,11 +25,9 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.shuffle.RangeShuffle;
-import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.SerializableSupplier;
@@ -114,9 +112,7 @@ public class SortUtils {
fields.addAll(dataFields);
final RowType longRowType = new RowType(fields);
final InternalTypeInfo<InternalRow> internalRowType =
- new InternalTypeInfo<>(
- new InternalRowTypeSerializer(
- longRowType.getFieldTypes().toArray(new
DataType[0])));
+ InternalTypeInfo.fromRowType(longRowType);
// generate the KEY as the key of Pair.
DataStream<Tuple2<KEY, RowData>> inputWithKey =
@@ -177,9 +173,7 @@ public class SortUtils {
return keyProjectedRow.replaceRow(value);
}
},
- new InternalTypeInfo<>(
- new InternalRowTypeSerializer(
-
valueRowType.getFieldTypes().toArray(new DataType[0]))))
+ InternalTypeInfo.fromRowType(valueRowType))
.setParallelism(sinkParallelism)
.map(FlinkRowData::new, inputStream.getType())
.setParallelism(sinkParallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
index 1cf197386..b0b0a70c1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
@@ -36,6 +37,10 @@ public class InternalRowTypeSerializer extends
InternalTypeSerializer<InternalRo
private final InternalRowSerializer internalRowSerializer;
+ public InternalRowTypeSerializer(RowType rowType) {
+ this(rowType.getFieldTypes().toArray(new DataType[0]));
+ }
+
public InternalRowTypeSerializer(DataType... types) {
internalRowSerializer = new InternalRowSerializer(types);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
index f0b0d0863..4ea5db9f3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
@@ -18,6 +18,9 @@
package org.apache.paimon.flink.utils;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -35,6 +38,10 @@ public class InternalTypeInfo<T> extends TypeInformation<T> {
this.serializer = serializer;
}
+ public static InternalTypeInfo<InternalRow> fromRowType(RowType rowType) {
+ return new InternalTypeInfo<>(new InternalRowTypeSerializer(rowType));
+ }
+
@Override
public boolean isBasicType() {
return false;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
index c70d9633c..f18116679 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
@@ -19,39 +19,34 @@
package org.apache.paimon.flink.utils;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerBiFunction;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.LogicalType;
-
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
+import static org.apache.paimon.data.InternalRow.createFieldGetter;
-/** Project {@link BinaryRow} fields into {@link RowData}. */
-public class ProjectToRowDataFunction implements SerBiFunction<RowData,
BinaryRow, RowData> {
+/** Project {@link BinaryRow} fields into {@link InternalRow}. */
+public class ProjectToRowDataFunction
+ implements SerBiFunction<InternalRow, BinaryRow, InternalRow> {
- private final RowData.FieldGetter[] fieldGetters;
+ private final InternalRow.FieldGetter[] fieldGetters;
private final Map<Integer, Integer> projectMapping;
- private final RowData.FieldGetter[] projectGetters;
+ private final InternalRow.FieldGetter[] projectGetters;
public ProjectToRowDataFunction(RowType rowType, List<String>
projectFields) {
- LogicalType[] types =
- LogicalTypeConversion.toLogicalType(rowType)
- .getChildren()
- .toArray(new LogicalType[0]);
+ DataType[] types = rowType.getFieldTypes().toArray(new DataType[0]);
this.fieldGetters =
IntStream.range(0, types.length)
.mapToObj(i -> createFieldGetter(types[i], i))
- .toArray(RowData.FieldGetter[]::new);
+ .toArray(InternalRow.FieldGetter[]::new);
List<String> fieldNames = rowType.getFieldNames();
this.projectMapping =
@@ -64,17 +59,16 @@ public class ProjectToRowDataFunction implements
SerBiFunction<RowData, BinaryRo
createFieldGetter(
types[rowType.getFieldIndex(field)],
projectFields.indexOf(field)))
- .toArray(RowData.FieldGetter[]::new);
+ .toArray(InternalRow.FieldGetter[]::new);
}
@Override
- public RowData apply(RowData input, BinaryRow project) {
- GenericRowData newRow = new GenericRowData(fieldGetters.length);
- FlinkRowData partRow = new FlinkRowData(project);
+ public InternalRow apply(InternalRow input, BinaryRow project) {
+ GenericRow newRow = new GenericRow(fieldGetters.length);
for (int i = 0; i < fieldGetters.length; i++) {
Object field =
projectMapping.containsKey(i)
- ?
projectGetters[projectMapping.get(i)].getFieldOrNull(partRow)
+ ?
projectGetters[projectMapping.get(i)].getFieldOrNull(project)
: fieldGetters[i].getFieldOrNull(input);
newRow.setField(i, field);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 81d7b5d5f..a93a361dd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
@@ -48,7 +48,6 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.table.data.RowData;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -82,10 +81,10 @@ public class FlinkSinkTest {
private boolean testSpillable(
StreamExecutionEnvironment streamExecutionEnvironment,
FileStoreTable fileStoreTable)
throws Exception {
- DataStreamSource<RowData> source =
+ DataStreamSource<InternalRow> source =
streamExecutionEnvironment.fromCollection(
- Collections.singletonList(new
FlinkRowData(GenericRow.of(1, 1))));
- FlinkSink<RowData> flinkSink = new FileStoreSink(fileStoreTable, null,
null);
+ Collections.singletonList(GenericRow.of(1, 1)));
+ FlinkSink<InternalRow> flinkSink = new FileStoreSink(fileStoreTable,
null, null);
SingleOutputStreamOperator<Committable> written =
flinkSink.doWrite(source, "123", 1);
RowDataStoreWriteOperator operator =
((RowDataStoreWriteOperator)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
index 710f8bece..f727cf981 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
@@ -19,17 +19,18 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -70,11 +71,10 @@ public class RowDataChannelComputerTest {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numInputs = random.nextInt(1000) + 1;
- List<RowData> input = new ArrayList<>();
+ List<InternalRow> input = new ArrayList<>();
for (int i = 0; i < numInputs; i++) {
input.add(
- GenericRowData.of(
- random.nextInt(10) + 1, random.nextLong(),
random.nextDouble()));
+ GenericRow.of(random.nextInt(10) + 1, random.nextLong(),
random.nextDouble()));
}
testImpl(schema, input);
@@ -100,17 +100,17 @@ public class RowDataChannelComputerTest {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numInputs = random.nextInt(1000) + 1;
- List<RowData> input = new ArrayList<>();
+ List<InternalRow> input = new ArrayList<>();
for (int i = 0; i < numInputs; i++) {
- input.add(GenericRowData.of(random.nextLong(),
random.nextDouble()));
+ input.add(GenericRow.of(random.nextLong(), random.nextDouble()));
}
testImpl(schema, input);
}
- private void testImpl(TableSchema schema, List<RowData> input) {
+ private void testImpl(TableSchema schema, List<InternalRow> input) {
ThreadLocalRandom random = ThreadLocalRandom.current();
- RowDataKeyAndBucketExtractor extractor = new
RowDataKeyAndBucketExtractor(schema);
+ FixedBucketRowKeyExtractor extractor = new
FixedBucketRowKeyExtractor(schema);
int numChannels = random.nextInt(10) + 1;
boolean hasLogSink = random.nextBoolean();
@@ -119,7 +119,7 @@ public class RowDataChannelComputerTest {
// assert that channel(record) and channel(partition, bucket) gives
the same result
- for (RowData rowData : input) {
+ for (InternalRow rowData : input) {
extractor.setRecord(rowData);
BinaryRow partition = extractor.partition();
int bucket = extractor.bucket();
@@ -156,7 +156,7 @@ public class RowDataChannelComputerTest {
if (hasLogSink) {
Map<Integer, Set<Integer>> channelsPerBucket = new HashMap<>();
- for (RowData rowData : input) {
+ for (InternalRow rowData : input) {
extractor.setRecord(rowData);
int bucket = extractor.bucket();
channelsPerBucket
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
index 67d54f61a..97503079b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -21,15 +21,15 @@ package org.apache.paimon.flink.sink.index;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -44,12 +44,13 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link GlobalIndexAssigner}. */
public class GlobalIndexAssignerTest extends TableTestBase {
- private GlobalIndexAssigner<RowData> createAssigner(MergeEngine
mergeEngine) throws Exception {
+ private GlobalIndexAssigner<InternalRow> createAssigner(MergeEngine
mergeEngine)
+ throws Exception {
return createAssigner(mergeEngine, false);
}
- private GlobalIndexAssigner<RowData> createAssigner(MergeEngine
mergeEngine, boolean enableTtl)
- throws Exception {
+ private GlobalIndexAssigner<InternalRow> createAssigner(
+ MergeEngine mergeEngine, boolean enableTtl) throws Exception {
Identifier identifier = identifier("T");
Options options = new Options();
options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
@@ -85,31 +86,32 @@ public class GlobalIndexAssignerTest extends TableTestBase {
}
private void innerTestBucketAssign(boolean enableTtl) throws Exception {
- GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
+ GlobalIndexAssigner<InternalRow> assigner =
+ createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
List<Integer> output = new ArrayList<>();
assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) ->
output.add(bucket));
// assign
- assigner.process(GenericRowData.of(1, 1, 1));
- assigner.process(GenericRowData.of(1, 2, 2));
- assigner.process(GenericRowData.of(1, 3, 3));
+ assigner.process(GenericRow.of(1, 1, 1));
+ assigner.process(GenericRow.of(1, 2, 2));
+ assigner.process(GenericRow.of(1, 3, 3));
assertThat(output).containsExactly(0, 0, 0);
output.clear();
// full
- assigner.process(GenericRowData.of(1, 4, 4));
+ assigner.process(GenericRow.of(1, 4, 4));
assertThat(output).containsExactly(2);
output.clear();
// another partition
- assigner.process(GenericRowData.of(2, 5, 5));
+ assigner.process(GenericRow.of(2, 5, 5));
assertThat(output).containsExactly(0);
output.clear();
// read assigned
- assigner.process(GenericRowData.of(1, 4, 4));
- assigner.process(GenericRowData.of(1, 2, 2));
- assigner.process(GenericRowData.of(1, 3, 3));
+ assigner.process(GenericRow.of(1, 4, 4));
+ assigner.process(GenericRow.of(1, 2, 2));
+ assigner.process(GenericRow.of(1, 3, 3));
assertThat(output).containsExactly(2, 0, 0);
output.clear();
@@ -118,8 +120,8 @@ public class GlobalIndexAssignerTest extends TableTestBase {
@Test
public void testUpsert() throws Exception {
- GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE);
- List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+ GlobalIndexAssigner<InternalRow> assigner =
createAssigner(MergeEngine.DEDUPLICATE);
+ List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
assigner.open(
new File(warehouse.getPath()),
2,
@@ -127,32 +129,32 @@ public class GlobalIndexAssignerTest extends
TableTestBase {
(row, bucket) -> output.add(new Tuple2<>(row, bucket)));
// change partition
- assigner.process(GenericRowData.of(1, 1, 1));
- assigner.process(GenericRowData.of(2, 1, 2));
+ assigner.process(GenericRow.of(1, 1, 1));
+ assigner.process(GenericRow.of(2, 1, 2));
assertThat(output)
.containsExactly(
- new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
- new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1,
1, 2), 0),
- new Tuple2<>(GenericRowData.of(2, 1, 2), 0));
+ new Tuple2<>(GenericRow.of(1, 1, 1), 0),
+ new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 1,
2), 0),
+ new Tuple2<>(GenericRow.of(2, 1, 2), 0));
output.clear();
// test partition 1 deleted
- assigner.process(GenericRowData.of(1, 2, 2));
- assigner.process(GenericRowData.of(1, 3, 3));
- assigner.process(GenericRowData.of(1, 4, 4));
+ assigner.process(GenericRow.of(1, 2, 2));
+ assigner.process(GenericRow.of(1, 3, 3));
+ assigner.process(GenericRow.of(1, 4, 4));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
output.clear();
// move from full bucket
- assigner.process(GenericRowData.of(2, 4, 4));
+ assigner.process(GenericRow.of(2, 4, 4));
assertThat(output)
.containsExactly(
- new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1,
4, 4), 0),
- new Tuple2<>(GenericRowData.of(2, 4, 4), 0));
+ new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 4,
4), 0),
+ new Tuple2<>(GenericRow.of(2, 4, 4), 0));
output.clear();
// test partition 1 deleted
- assigner.process(GenericRowData.of(1, 5, 5));
+ assigner.process(GenericRow.of(1, 5, 5));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0);
output.clear();
@@ -165,8 +167,8 @@ public class GlobalIndexAssignerTest extends TableTestBase {
ThreadLocalRandom.current().nextBoolean()
? MergeEngine.PARTIAL_UPDATE
: MergeEngine.AGGREGATE;
- GlobalIndexAssigner<RowData> assigner = createAssigner(mergeEngine);
- List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+ GlobalIndexAssigner<InternalRow> assigner =
createAssigner(mergeEngine);
+ List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
assigner.open(
new File(warehouse.getPath()),
2,
@@ -174,18 +176,18 @@ public class GlobalIndexAssignerTest extends
TableTestBase {
(row, bucket) -> output.add(new Tuple2<>(row, bucket)));
// change partition
- assigner.process(GenericRowData.of(1, 1, 1));
- assigner.process(GenericRowData.of(2, 1, 2));
+ assigner.process(GenericRow.of(1, 1, 1));
+ assigner.process(GenericRow.of(2, 1, 2));
assertThat(output)
.containsExactly(
- new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
- new Tuple2<>(GenericRowData.of(1, 1, 2), 0));
+ new Tuple2<>(GenericRow.of(1, 1, 1), 0),
+ new Tuple2<>(GenericRow.of(1, 1, 2), 0));
output.clear();
// test partition 2 no effect
- assigner.process(GenericRowData.of(2, 2, 2));
- assigner.process(GenericRowData.of(2, 3, 3));
- assigner.process(GenericRowData.of(2, 4, 4));
+ assigner.process(GenericRow.of(2, 2, 2));
+ assigner.process(GenericRow.of(2, 3, 3));
+ assigner.process(GenericRow.of(2, 4, 4));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
output.clear();
assigner.close();
@@ -193,8 +195,8 @@ public class GlobalIndexAssignerTest extends TableTestBase {
@Test
public void testFirstRow() throws Exception {
- GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.FIRST_ROW);
- List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+ GlobalIndexAssigner<InternalRow> assigner =
createAssigner(MergeEngine.FIRST_ROW);
+ List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
assigner.open(
new File(warehouse.getPath()),
2,
@@ -202,15 +204,15 @@ public class GlobalIndexAssignerTest extends
TableTestBase {
(row, bucket) -> output.add(new Tuple2<>(row, bucket)));
// change partition
- assigner.process(GenericRowData.of(1, 1, 1));
- assigner.process(GenericRowData.of(2, 1, 2));
- assertThat(output).containsExactly(new Tuple2<>(GenericRowData.of(1,
1, 1), 0));
+ assigner.process(GenericRow.of(1, 1, 1));
+ assigner.process(GenericRow.of(2, 1, 2));
+ assertThat(output).containsExactly(new Tuple2<>(GenericRow.of(1, 1,
1), 0));
output.clear();
// test partition 2 no effect
- assigner.process(GenericRowData.of(2, 2, 2));
- assigner.process(GenericRowData.of(2, 3, 3));
- assigner.process(GenericRowData.of(2, 4, 4));
+ assigner.process(GenericRow.of(2, 2, 2));
+ assigner.process(GenericRow.of(2, 3, 3));
+ assigner.process(GenericRow.of(2, 4, 4));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
output.clear();
assigner.close();