This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d054a4c06 [flink] support order sort in flink compact action (#1904)
d054a4c06 is described below
commit d054a4c0680527578587fe95681990f00a8fda9d
Author: YeJunHao <[email protected]>
AuthorDate: Wed Aug 30 13:56:09 2023 +0800
[flink] support order sort in flink compact action (#1904)
---
.../data/serializer/InternalRowSerializer.java | 2 +-
.../org/apache/paimon/sort/zorder/ZIndexer.java | 50 ++++--
.../paimon/flink/BinaryRowTypeSerializer.java | 176 -------------------
.../org/apache/paimon/flink/action/ActionBase.java | 2 +
.../paimon/flink/action/SortCompactAction.java | 13 +-
.../apache/paimon/flink/shuffle/RangeShuffle.java | 76 ++++----
.../apache/paimon/flink/sorter/OrderSorter.java | 84 +++++++++
.../apache/paimon/flink/sorter/SortOperator.java | 23 +--
.../org/apache/paimon/flink/sorter/SortUtils.java | 191 +++++++++++++++++++++
.../apache/paimon/flink/sorter/TableSorter.java | 3 +-
.../apache/paimon/flink/sorter/ZorderSorter.java | 47 ++++-
.../paimon/flink/sorter/ZorderSorterUtils.java | 156 -----------------
.../flink/utils/InternalRowTypeSerializer.java | 108 ++++++++++++
.../paimon/flink/BinaryRowTypeSerializerTest.java | 58 -------
.../flink/action/OrderRewriteActionITCase.java | 117 ++++++++++++-
.../flink/utils/InternalRowSerializerTest.java | 93 ++++++++++
16 files changed, 729 insertions(+), 470 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index b0701be48..199d08b67 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -109,7 +109,7 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
}
@SuppressWarnings("unchecked")
- private InternalRow copyRowData(InternalRow from, InternalRow reuse) {
+ public InternalRow copyRowData(InternalRow from, InternalRow reuse) {
GenericRow ret;
if (reuse instanceof GenericRow) {
ret = (GenericRow) reuse;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
index 8c30a3b02..9701dd90e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
@@ -18,9 +18,11 @@
package org.apache.paimon.sort.zorder;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
@@ -128,26 +130,42 @@ public class ZIndexer implements Serializable {
@Override
public ZProcessFunction visit(CharType charType) {
- return (row, reuse) ->
- row.isNullAt(fieldIndex)
- ? NULL_BYTES
- : ZOrderByteUtils.stringToOrderedBytes(
-
row.getString(fieldIndex).toString(),
- PRIMITIVE_BUFFER_SIZE,
- reuse)
- .array();
+ return (row, reuse) -> {
+ BinaryString binaryString = row.getString(fieldIndex);
+
+ return row.isNullAt(fieldIndex)
+ ? NULL_BYTES
+ : ZOrderByteUtils.byteTruncateOrFill(
+ MemorySegmentUtils.getBytes(
+ binaryString.getSegments(),
+ binaryString.getOffset(),
+ Math.min(
+ PRIMITIVE_BUFFER_SIZE,
+
binaryString.getSizeInBytes())),
+ PRIMITIVE_BUFFER_SIZE,
+ reuse)
+ .array();
+ };
}
@Override
public ZProcessFunction visit(VarCharType varCharType) {
- return (row, reuse) ->
- row.isNullAt(fieldIndex)
- ? NULL_BYTES
- : ZOrderByteUtils.stringToOrderedBytes(
-
row.getString(fieldIndex).toString(),
- PRIMITIVE_BUFFER_SIZE,
- reuse)
- .array();
+ return (row, reuse) -> {
+ BinaryString binaryString = row.getString(fieldIndex);
+
+ return row.isNullAt(fieldIndex)
+ ? NULL_BYTES
+ : ZOrderByteUtils.byteTruncateOrFill(
+ MemorySegmentUtils.getBytes(
+ binaryString.getSegments(),
+ binaryString.getOffset(),
+ Math.min(
+ PRIMITIVE_BUFFER_SIZE,
+
binaryString.getSizeInBytes())),
+ PRIMITIVE_BUFFER_SIZE,
+ reuse)
+ .array();
+ };
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/BinaryRowTypeSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/BinaryRowTypeSerializer.java
deleted file mode 100644
index 892969583..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/BinaryRowTypeSerializer.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.memory.MemorySegment;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.Objects;
-
-/** A {@link TypeSerializer} to serialize {@link BinaryRow}. */
-public final class BinaryRowTypeSerializer extends TypeSerializer<BinaryRow> {
-
- private static final long serialVersionUID = 1L;
-
- private final BinaryRowSerializer serializer;
-
- public BinaryRowTypeSerializer(int numFields) {
- this.serializer = new BinaryRowSerializer(numFields);
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public BinaryRowTypeSerializer duplicate() {
- return new BinaryRowTypeSerializer(serializer.getArity());
- }
-
- @Override
- public BinaryRow createInstance() {
- return serializer.createInstance();
- }
-
- @Override
- public BinaryRow copy(BinaryRow from) {
- return serializer.copy(from);
- }
-
- @Override
- public BinaryRow copy(BinaryRow from, BinaryRow reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(BinaryRow record, DataOutputView target) throws
IOException {
- target.writeInt(record.getSizeInBytes());
- target.write(record.toBytes());
- }
-
- @Override
- public BinaryRow deserialize(DataInputView source) throws IOException {
- return deserialize(createInstance(), source);
- }
-
- @Override
- public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws
IOException {
- int len = source.readInt();
- byte[] bytes = new byte[len];
- source.readFully(bytes);
- reuse.pointTo(MemorySegment.wrap(bytes), 0, len);
- return reuse;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws
IOException {
- int length = source.readInt();
- target.writeInt(length);
- target.write(source, length);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- BinaryRowTypeSerializer that = (BinaryRowTypeSerializer) o;
- return Objects.equals(serializer, that.serializer);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(serializer);
- }
-
- @Override
- public TypeSerializerSnapshot<BinaryRow> snapshotConfiguration() {
- return new BinaryRowTypeSerializerSnapshot(serializer.getArity());
- }
-
- /**
- * {@link TypeSerializerSnapshot} for {@link BinaryRow}. It checks the
compatibility of
- * numFields without checking type.
- */
- public static class BinaryRowTypeSerializerSnapshot
- implements TypeSerializerSnapshot<BinaryRow> {
-
- private int numFields;
-
- public BinaryRowTypeSerializerSnapshot() {}
-
- private BinaryRowTypeSerializerSnapshot(int numFields) {
- this.numFields = numFields;
- }
-
- @Override
- public int getCurrentVersion() {
- return 0;
- }
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- out.writeInt(numFields);
- }
-
- @Override
- public void readSnapshot(int readVersion, DataInputView in,
ClassLoader userCodeClassLoader)
- throws IOException {
- this.numFields = in.readInt();
- }
-
- @Override
- public TypeSerializer<BinaryRow> restoreSerializer() {
- return new BinaryRowTypeSerializer(numFields);
- }
-
- @Override
- public TypeSerializerSchemaCompatibility<BinaryRow>
resolveSchemaCompatibility(
- TypeSerializer<BinaryRow> newSerializer) {
- if (!(newSerializer instanceof BinaryRowTypeSerializer)) {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
-
- BinaryRowTypeSerializer other = (BinaryRowTypeSerializer)
newSerializer;
-
- if (numFields == other.serializer.getArity()) {
- return TypeSerializerSchemaCompatibility.compatibleAsIs();
- } else {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index d9f839798..6a4477f47 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -60,6 +60,8 @@ public abstract class ActionBase implements Action {
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog,
catalogOptions);
env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // we enable object reuse, we copy the un-reusable object ourselves.
+ env.getConfig().enableObjectReuse();
batchTEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
// register flink catalog to table environment
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index ff5e1a90d..9b488476d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -35,6 +35,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@@ -46,6 +48,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Compact with sort action. */
public class SortCompactAction extends CompactAction {
+ private static final Logger LOG =
LoggerFactory.getLogger(SortCompactAction.class);
+
private String sortStrategy;
private List<String> orderColumns;
@@ -64,8 +68,6 @@ public class SortCompactAction extends CompactAction {
@Override
public void run() throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
build(env);
execute(env, "Sort Compact Job");
}
@@ -74,8 +76,9 @@ public class SortCompactAction extends CompactAction {
// only support batch sort yet
if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
!= RuntimeExecutionMode.BATCH) {
- throw new IllegalArgumentException(
- "Only support batch mode yet, please set
-Dexecution.runtime-mode=BATCH");
+ LOG.warn(
+ "Sort Compact only support batch mode yet. Please add
-Dexecution.runtime-mode=BATCH. The action this time will shift to batch mode
forcely.");
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
}
FileStoreTable fileStoreTable = (FileStoreTable) table;
if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
@@ -102,7 +105,7 @@ public class SortCompactAction extends CompactAction {
sourceBuilder.withPredicate(partitionPredicate);
}
- String scanParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ String scanParallelism =
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 7318a883d..89b0729f6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.shuffle;
-import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SerializableSupplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.Partitioner;
@@ -81,20 +81,21 @@ public class RangeShuffle {
* <p>The streams except the sample and histogram process stream will been
blocked, so the the
* sample and histogram process stream does not care about
requiredExchangeMode.
*/
- public static <T> DataStream<Pair<T, RowData>> rangeShuffleByKey(
- DataStream<Pair<T, RowData>> inputDataStream,
- Comparator<T> keyComparator,
- Class<T> keyClass,
+ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
+ DataStream<Tuple2<T, RowData>> inputDataStream,
+ SerializableSupplier<Comparator<T>> keyComparator,
+ TypeInformation<T> keyTypeInformation,
int sampleSize,
- int rangeNum) {
- Transformation<Pair<T, RowData>> input =
inputDataStream.getTransformation();
+ int rangeNum,
+ int outParallelism) {
+ Transformation<Tuple2<T, RowData>> input =
inputDataStream.getTransformation();
- OneInputTransformation<Pair<T, RowData>, T> keyInput =
+ OneInputTransformation<Tuple2<T, RowData>, T> keyInput =
new OneInputTransformation<>(
input,
"ABSTRACT KEY",
- new StreamMap<>(Pair::getLeft),
- TypeInformation.of(keyClass),
+ new StreamMap<>(a -> a.f0),
+ keyTypeInformation,
input.getParallelism());
// 1. Fixed size sample in each partitions.
@@ -103,8 +104,7 @@ public class RangeShuffle {
keyInput,
"LOCAL SAMPLE",
new LocalSampleOperator<>(sampleSize),
- new TupleTypeInfo<>(
- BasicTypeInfo.DOUBLE_TYPE_INFO,
TypeInformation.of(keyClass)),
+ new TupleTypeInfo<>(BasicTypeInfo.DOUBLE_TYPE_INFO,
keyTypeInformation),
keyInput.getParallelism());
// 2. Collect all the samples and gather them into a sorted key range.
@@ -113,14 +113,14 @@ public class RangeShuffle {
localSample,
"GLOBAL SAMPLE",
new GlobalSampleOperator<>(sampleSize, keyComparator,
rangeNum),
- new ListTypeInfo<>(TypeInformation.of(keyClass)),
+ new ListTypeInfo<>(keyTypeInformation),
1);
// 3. Take range boundaries as broadcast input and take the tuple of
partition id and
// record as output.
// The shuffle mode of input edge must be BATCH to avoid dead lock. See
// DeadlockBreakupProcessor.
- TwoInputTransformation<List<T>, Pair<T, RowData>, Tuple2<Integer,
Pair<T, RowData>>>
+ TwoInputTransformation<List<T>, Tuple2<T, RowData>, Tuple2<Integer,
Tuple2<T, RowData>>>
preparePartition =
new TwoInputTransformation<>(
new PartitionTransformation<>(
@@ -150,7 +150,7 @@ public class RangeShuffle {
"REMOVE KEY",
new RemoveRangeIndexOperator<>(),
input.getOutputType(),
- input.getParallelism()));
+ outParallelism));
}
/**
@@ -207,20 +207,25 @@ public class RangeShuffle {
private final int numSample;
private final int rangesNum;
- private final Comparator<T> keyComparator;
+ private final SerializableSupplier<Comparator<T>> comparatorSupplier;
+ private transient Comparator<T> keyComparator;
private transient Collector<List<T>> collector;
private transient Sampler<T> sampler;
- public GlobalSampleOperator(int numSample, Comparator<T> comparator,
int rangesNum) {
+ public GlobalSampleOperator(
+ int numSample,
+ SerializableSupplier<Comparator<T>> comparatorSupplier,
+ int rangesNum) {
this.numSample = numSample;
- this.keyComparator = comparator;
+ this.comparatorSupplier = comparatorSupplier;
this.rangesNum = rangesNum;
}
@Override
public void open() throws Exception {
super.open();
+ this.keyComparator = comparatorSupplier.get();
this.sampler = new Sampler<>(numSample, 0L);
this.collector = new StreamRecordCollector<>(output);
}
@@ -262,25 +267,27 @@ public class RangeShuffle {
* Tuple2 which includes range index and record from the other input
itself as output.
*/
private static class AssignRangeIndexOperator<T>
- extends TableStreamOperator<Tuple2<Integer, Pair<T, RowData>>>
+ extends TableStreamOperator<Tuple2<Integer, Tuple2<T, RowData>>>
implements TwoInputStreamOperator<
- List<T>, Pair<T, RowData>, Tuple2<Integer, Pair<T,
RowData>>>,
+ List<T>, Tuple2<T, RowData>, Tuple2<Integer,
Tuple2<T, RowData>>>,
InputSelectable {
private static final long serialVersionUID = 1L;
- private transient List<T> boundaries;
- private transient Collector<Tuple2<Integer, Pair<T, RowData>>>
collector;
+ private final SerializableSupplier<Comparator<T>>
keyComparatorSupplier;
- private final Comparator<T> keyComparator;
+ private transient List<T> boundaries;
+ private transient Collector<Tuple2<Integer, Tuple2<T, RowData>>>
collector;
+ private transient Comparator<T> keyComparator;
- public AssignRangeIndexOperator(Comparator<T> comparator) {
- this.keyComparator = comparator;
+ public AssignRangeIndexOperator(SerializableSupplier<Comparator<T>>
keyComparatorSupplier) {
+ this.keyComparatorSupplier = keyComparatorSupplier;
}
@Override
public void open() throws Exception {
super.open();
+ this.keyComparator = keyComparatorSupplier.get();
this.collector = new StreamRecordCollector<>(output);
}
@@ -290,12 +297,12 @@ public class RangeShuffle {
}
@Override
- public void processElement2(StreamRecord<Pair<T, RowData>>
streamRecord) {
+ public void processElement2(StreamRecord<Tuple2<T, RowData>>
streamRecord) {
if (boundaries == null) {
throw new RuntimeException("There should be one data from the
first input.");
}
- Pair<T, RowData> row = streamRecord.getValue();
- collector.collect(new Tuple2<>(binarySearch(row.getLeft()), row));
+ Tuple2<T, RowData> row = streamRecord.getValue();
+ collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}
@Override
@@ -326,12 +333,12 @@ public class RangeShuffle {
/** A {@link KeySelector} to select by f0 of tuple2. */
public static class Tuple2KeySelector<T>
- implements KeySelector<Tuple2<Integer, Pair<T, RowData>>,
Integer> {
+ implements KeySelector<Tuple2<Integer, Tuple2<T, RowData>>,
Integer> {
private static final long serialVersionUID = 1L;
@Override
- public Integer getKey(Tuple2<Integer, Pair<T, RowData>> tuple2)
throws Exception {
+ public Integer getKey(Tuple2<Integer, Tuple2<T, RowData>> tuple2)
throws Exception {
return tuple2.f0;
}
}
@@ -359,12 +366,13 @@ public class RangeShuffle {
}
/** Remove the range index and return the actual record. */
- private static class RemoveRangeIndexOperator<T> extends
TableStreamOperator<Pair<T, RowData>>
- implements OneInputStreamOperator<Tuple2<Integer, Pair<T,
RowData>>, Pair<T, RowData>> {
+ private static class RemoveRangeIndexOperator<T> extends
TableStreamOperator<Tuple2<T, RowData>>
+ implements OneInputStreamOperator<
+ Tuple2<Integer, Tuple2<T, RowData>>, Tuple2<T, RowData>> {
private static final long serialVersionUID = 1L;
- private transient Collector<Pair<T, RowData>> collector;
+ private transient Collector<Tuple2<T, RowData>> collector;
@Override
public void open() throws Exception {
@@ -373,7 +381,7 @@ public class RangeShuffle {
}
@Override
- public void processElement(StreamRecord<Tuple2<Integer, Pair<T,
RowData>>> streamRecord)
+ public void processElement(StreamRecord<Tuple2<Integer, Tuple2<T,
RowData>>> streamRecord)
throws Exception {
collector.collect(streamRecord.getValue().f1);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
new file mode 100644
index 000000000..e357aebd3
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyComparatorSupplier;
+import org.apache.paimon.utils.Projection;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+import static
org.apache.paimon.table.ChangelogWithKeyTableUtils.addKeyNamePrefix;
+
+/** Alphabetical order sorter to sort records by the given `orderColNames`. */
+public class OrderSorter extends TableSorter {
+
+ public OrderSorter(
+ StreamExecutionEnvironment batchTEnv,
+ DataStream<RowData> origin,
+ FileStoreTable table,
+ List<String> orderColNames) {
+ super(batchTEnv, origin, table, orderColNames);
+ }
+
+ @Override
+ public DataStream<RowData> sort() {
+ final RowType valueRowType = table.rowType();
+ final int[] keyProjectionMap =
table.schema().projection(orderColNames);
+ final RowType keyRowType =
+
addKeyNamePrefix(Projection.of(keyProjectionMap).project(valueRowType));
+
+ return SortUtils.sortStreamByKey(
+ origin,
+ table,
+ keyRowType,
+ new InternalTypeInfo<>(
+ new InternalRowTypeSerializer(
+ keyRowType.getFieldTypes().toArray(new
DataType[0]))),
+ new KeyComparatorSupplier(keyRowType),
+ new SortUtils.KeyAbstract<InternalRow>() {
+
+ private transient org.apache.paimon.codegen.Projection
keyProjection;
+
+ @Override
+ public void open() {
+ // use key gen to speed up projection
+ keyProjection =
CodeGenUtils.newProjection(valueRowType, keyProjectionMap);
+ }
+
+ @Override
+ public InternalRow apply(RowData value) {
+ // deep copy by wrapper the Flink RowData
+ return keyProjection.apply(new
FlinkRowWrapper(value)).copy();
+ }
+ },
+ row -> row);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index d51cf1833..368639756 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -31,7 +31,6 @@ import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.MutableObjectIterator;
@@ -43,43 +42,29 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import java.util.ArrayList;
-import java.util.List;
-
import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
/** SortOperator to sort the `InternalRow`s by the `KeyType`. */
public class SortOperator extends TableStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
- private final RowType keyRowType;
- private final RowType valueRowType;
+ private final RowType rowType;
private final long maxMemory;
private final int pageSize;
private final int arity;
private transient BinaryExternalSortBuffer buffer;
- public SortOperator(RowType keyType, RowType valueRowType, long maxMemory,
int pageSize) {
- this.keyRowType = keyType;
- this.valueRowType = valueRowType;
+ public SortOperator(RowType rowType, long maxMemory, int pageSize) {
+ this.rowType = rowType;
this.maxMemory = maxMemory;
this.pageSize = pageSize;
- this.arity = keyType.getFieldCount() + valueRowType.getFieldCount();
+ this.arity = rowType.getFieldCount();
}
@Override
public void open() throws Exception {
super.open();
- List<DataField> keyFields = keyRowType.getFields();
- List<DataField> dataFields = valueRowType.getFields();
-
- List<DataField> fields = new ArrayList<>();
- fields.addAll(keyFields);
- fields.addAll(dataFields);
-
- RowType rowType = new RowType(fields);
-
InternalRowSerializer serializer = InternalSerializers.create(rowType);
NormalizedKeyComputer normalizedKeyComputer =
CodeGenUtils.newNormalizedKeyComputer(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
new file mode 100644
index 000000000..754086221
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.shuffle.RangeShuffle;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.SerializableSupplier;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * This is a table sorter which will sort the records by the key of the
shuffleKeyComparator
+ * generates. It is a global sort method, we will shuffle the input stream
through shuffleKey. After
+ * sorted, we convert datastream from paimon RowData back to Flink RowData
+ *
+ * <pre>
+ * toPaimonDataStream add key
column range shuffle by key
local sort remove
key
+ * DataStream[RowData] --------------------> DataStream[PaimonRowData]
-------------------> DataStream[PaimonRowData] ------------------------->
DataStream[PaimonRowData] -----------------------> DataStream[PaimonRowData
sorted] ---------------------> DataStream[RowData sorted]
+ *
back to
flink RowData
+ * </pre>
+ */
+public class SortUtils {
+
+ /**
+ * Sort the input stream by the key specified.
+ *
+ * @param inputStream the input data stream
+ * @param table the sorted file store table
+ * @param sortKeyType we will use paimon `BinaryExternalSortBuffer` to
local sort, so we need to
+ * specify the key type.
+ * @param keyTypeInformation we will use range shuffle in global sort, so
we need to range
+ * shuffle by the key first.
+ * @param shuffleKeyComparator comparator to compare the key when shuffle
+ * @param shuffleKeyAbstract abstract the key from the input `RowData`
+ * @param convertor convert the `KEY` to the sort key, then we can sort in
+ * `BinaryExternalSortBuffer`.
+ * @return the global sorted data stream
+ * @param <KEY> the KEY type in range shuffle
+ */
+ public static <KEY> DataStream<RowData> sortStreamByKey(
+ final DataStream<RowData> inputStream,
+ final FileStoreTable table,
+ final RowType sortKeyType,
+ final TypeInformation<KEY> keyTypeInformation,
+ final SerializableSupplier<Comparator<KEY>> shuffleKeyComparator,
+ final KeyAbstract<KEY> shuffleKeyAbstract,
+ final ShuffleKeyConvertor<KEY> convertor) {
+
+ final RowType valueRowType = table.rowType();
+ final int parallelism = inputStream.getParallelism();
+ final long maxSortMemory = table.coreOptions().writeBufferSize();
+ final int pageSize = table.coreOptions().pageSize();
+
+ String sinkParallelismValue =
+
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ final int sinkParallelism =
+ sinkParallelismValue == null
+ ? inputStream.getParallelism()
+ : Integer.parseInt(sinkParallelismValue);
+ final int sampleSize = sinkParallelism * 1000;
+ final int rangeNum = sinkParallelism * 10;
+
+ int keyFieldCount = sortKeyType.getFieldCount();
+ int valueFieldCount = valueRowType.getFieldCount();
+ final int[] valueProjectionMap = new int[valueFieldCount];
+ for (int i = 0; i < valueFieldCount; i++) {
+ valueProjectionMap[i] = i + keyFieldCount;
+ }
+
+ List<DataField> keyFields = sortKeyType.getFields();
+ List<DataField> dataFields = valueRowType.getFields();
+
+ List<DataField> fields = new ArrayList<>();
+ fields.addAll(keyFields);
+ fields.addAll(dataFields);
+ final RowType longRowType = new RowType(fields);
+ final InternalTypeInfo<InternalRow> internalRowType =
+ new InternalTypeInfo<>(
+ new InternalRowTypeSerializer(
+ longRowType.getFieldTypes().toArray(new
DataType[0])));
+
+ // generate the KEY as the key of Pair.
+ DataStream<Tuple2<KEY, RowData>> inputWithKey =
+ inputStream
+ .map(
+ new RichMapFunction<RowData, Tuple2<KEY,
RowData>>() {
+
+ @Override
+ public void open(Configuration parameters)
throws Exception {
+ super.open(parameters);
+ shuffleKeyAbstract.open();
+ }
+
+ @Override
+ public Tuple2<KEY, RowData> map(RowData
value) {
+ return
Tuple2.of(shuffleKeyAbstract.apply(value), value);
+ }
+ },
+ new TupleTypeInfo<>(keyTypeInformation,
inputStream.getType()))
+ .setParallelism(parallelism);
+
+ // range shuffle by key
+ return RangeShuffle.rangeShuffleByKey(
+ inputWithKey,
+ shuffleKeyComparator,
+ keyTypeInformation,
+ sampleSize,
+ rangeNum,
+ sinkParallelism)
+ .map(
+ a -> new JoinedRow(convertor.apply(a.f0), new
FlinkRowWrapper(a.f1)),
+ internalRowType)
+ .setParallelism(sinkParallelism)
+ // sort the output locally by `SortOperator`
+ .transform(
+ "LOCAL SORT",
+ internalRowType,
+ new SortOperator(longRowType, maxSortMemory, pageSize))
+ .setParallelism(sinkParallelism)
+ // remove the key column from every row
+ .map(
+ new RichMapFunction<InternalRow, InternalRow>() {
+
+ private transient KeyProjectedRow keyProjectedRow;
+
+ @Override
+ public void open(Configuration parameters) {
+ keyProjectedRow = new
KeyProjectedRow(valueProjectionMap);
+ }
+
+ @Override
+ public InternalRow map(InternalRow value) {
+ return keyProjectedRow.replaceRow(value);
+ }
+ },
+ new InternalTypeInfo<>(
+ new InternalRowTypeSerializer(
+
valueRowType.getFieldTypes().toArray(new DataType[0]))))
+ .setParallelism(sinkParallelism)
+ .map(FlinkRowData::new, inputStream.getType())
+ .setParallelism(sinkParallelism);
+ }
+
+ /** Abstract key from a row data. */
+ interface KeyAbstract<KEY> extends Serializable {
+ default void open() {}
+
+ KEY apply(RowData value);
+ }
+
+ interface ShuffleKeyConvertor<KEY> extends Function<KEY, InternalRow>,
Serializable {}
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index 1cf3572bc..6b27f17c3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -74,8 +74,7 @@ public abstract class TableSorter {
List<String> orderColumns) {
switch (OrderType.of(sortStrategy)) {
case ORDER:
- // todo support alphabetical order
- throw new IllegalArgumentException("Not supported yet.");
+ return new OrderSorter(batchTEnv, origin, fileStoreTable,
orderColumns);
case ZORDER:
return new ZorderSorter(batchTEnv, origin, fileStoreTable,
orderColumns);
case HILBERT:
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
index 465ab89d0..66051dc03 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -18,14 +18,24 @@
package org.apache.paimon.flink.sorter;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.sort.zorder.ZIndexer;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import
org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/**
@@ -36,6 +46,9 @@ import java.util.List;
*/
public class ZorderSorter extends TableSorter {
+ private static final RowType KEY_TYPE =
+ new RowType(Collections.singletonList(new DataField(0, "Z_INDEX",
DataTypes.BYTES())));
+
public ZorderSorter(
StreamExecutionEnvironment batchTEnv,
DataStream<RowData> origin,
@@ -57,7 +70,37 @@ public class ZorderSorter extends TableSorter {
*/
private DataStream<RowData> sortStreamByZOrder(
DataStream<RowData> inputStream, FileStoreTable table) {
- ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames);
- return ZorderSorterUtils.sortStreamByZorder(inputStream, zIndexer,
table);
+ final ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames);
+ return SortUtils.sortStreamByKey(
+ inputStream,
+ table,
+ KEY_TYPE,
+ TypeInformation.of(byte[].class),
+ () ->
+ (b1, b2) -> {
+ assert b1.length == b2.length;
+ for (int i = 0; i < b1.length; i++) {
+ int ret = UnsignedBytes.compare(b1[i], b2[i]);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+ return 0;
+ },
+ new SortUtils.KeyAbstract<byte[]>() {
+ @Override
+ public void open() {
+ zIndexer.open();
+ }
+
+ @Override
+ public byte[] apply(RowData value) {
+ byte[] zorder = zIndexer.index(new
FlinkRowWrapper(value));
+ // we can just return the reused bytes zorder, because
the sample operator
+ // will remember the record to sample.
+ return Arrays.copyOf(zorder, zorder.length);
+ }
+ },
+ GenericRow::of);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
deleted file mode 100644
index dd94255a8..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sorter;
-
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.JoinedRow;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.flink.shuffle.RangeShuffle;
-import org.apache.paimon.sort.zorder.ZIndexer;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.KeyProjectedRow;
-import org.apache.paimon.utils.Pair;
-
-import
org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.data.RowData;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-
-/**
- * This is a table sorter which will sort the records by the z-order of the
z-indexer generates. It
- * is a global sort method, we will shuffle the input stream through z-order.
After sorted, we
- * convert datastream from paimon RowData back to Flink RowData
- *
- * <pre>
- * toPaimonDataStream add
z-order column range shuffle by z-order
local sort
remove z-index
- * DataStream[RowData] -------------------> DataStream[PaimonRowData]
-------------------> DataStream[PaimonRowData] ------------------------->
DataStream[PaimonRowData] -----------------------> DataStream[PaimonRowData
sorted] ----------------------------> DataStream[RowData sorted]
- *
back to
flink RowData
- * </pre>
- */
-public class ZorderSorterUtils {
-
- private static final RowType KEY_TYPE =
- new RowType(Collections.singletonList(new DataField(0, "Z_INDEX",
DataTypes.BYTES())));
-
- /**
- * Sort the input stream by z-order.
- *
- * @param inputStream the stream wait to be ordered
- * @param zIndexer generate z-index by the given row
- * @param table the FileStoreTable
- */
- public static DataStream<RowData> sortStreamByZorder(
- DataStream<RowData> inputStream, ZIndexer zIndexer, FileStoreTable
table) {
-
- final RowType valueRowType = table.rowType();
- final int fieldCount = valueRowType.getFieldCount();
- final int parallelism = inputStream.getParallelism();
- final int sampleSize = parallelism * 1000;
- final int rangeNum = parallelism * 10;
- final long maxSortMemory = table.coreOptions().writeBufferSize();
- final int pageSize = table.coreOptions().pageSize();
-
- // generate the z-index as the key of Pair.
- DataStream<Pair<byte[], RowData>> inputWithKey =
- inputStream
- .map(
- new RichMapFunction<RowData, Pair<byte[],
RowData>>() {
-
- @Override
- public void open(Configuration parameters)
throws Exception {
- super.open(parameters);
- zIndexer.open();
- }
-
- @Override
- public Pair<byte[], RowData> map(RowData
value) {
- byte[] zorder = zIndexer.index(new
FlinkRowWrapper(value));
- return Pair.of(Arrays.copyOf(zorder,
zorder.length), value);
- }
- })
- .setParallelism(parallelism);
-
- // range shuffle by z-index key
- return RangeShuffle.rangeShuffleByKey(
- inputWithKey,
- (Comparator<byte[]> & Serializable)
- (b1, b2) -> {
- assert b1.length == b2.length;
- for (int i = 0; i < b1.length; i++) {
- int ret = UnsignedBytes.compare(b1[i],
b2[i]);
- if (ret != 0) {
- return ret;
- }
- }
- return 0;
- },
- byte[].class,
- sampleSize,
- rangeNum)
- .map(
- a ->
- new JoinedRow(
- GenericRow.of(a.getLeft()),
- new FlinkRowWrapper(a.getRight())),
- TypeInformation.of(InternalRow.class))
- .setParallelism(parallelism)
- // sort the output locally by `SortOperator`
- .transform(
- "LOCAL SORT",
- TypeInformation.of(InternalRow.class),
- new SortOperator(KEY_TYPE, valueRowType,
maxSortMemory, pageSize))
- .setParallelism(parallelism)
- // remove the z-index column from every row
- .map(
- new RichMapFunction<InternalRow, InternalRow>() {
-
- private transient KeyProjectedRow keyProjectedRow;
-
- @Override
- public void open(Configuration parameters) {
- int[] map = new int[fieldCount];
- for (int i = 0; i < map.length; i++) {
- map[i] = i + 1;
- }
- keyProjectedRow = new KeyProjectedRow(map);
- }
-
- @Override
- public InternalRow map(InternalRow value) {
- return keyProjectedRow.replaceRow(value);
- }
- })
- .setParallelism(parallelism)
- .map(FlinkRowData::new, inputStream.getType())
- .setParallelism(parallelism);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
new file mode 100644
index 000000000..1cf197386
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataType;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** A TypeSerializer for {@link InternalRow}. */
+public class InternalRowTypeSerializer extends
InternalTypeSerializer<InternalRow> {
+
+ private final InternalRowSerializer internalRowSerializer;
+
+ public InternalRowTypeSerializer(DataType... types) {
+ internalRowSerializer = new InternalRowSerializer(types);
+ }
+
+ @Override
+ public TypeSerializer<InternalRow> duplicate() {
+ return new
InternalRowTypeSerializer(internalRowSerializer.fieldTypes());
+ }
+
+ @Override
+ public InternalRow createInstance() {
+ return new BinaryRow(internalRowSerializer.getArity());
+ }
+
+ @Override
+ public InternalRow copy(InternalRow from) {
+ return internalRowSerializer.copy(from);
+ }
+
+ @Override
+ public InternalRow copy(InternalRow from, InternalRow reuse) {
+ return internalRowSerializer.copyRowData(from, reuse);
+ }
+
+ @Override
+ public void serialize(InternalRow record, DataOutputView target) throws
IOException {
+ BinaryRow row = internalRowSerializer.toBinaryRow(record);
+ target.writeInt(row.getSizeInBytes());
+ target.write(row.toBytes());
+ }
+
+ @Override
+ public InternalRow deserialize(DataInputView source) throws IOException {
+ return deserialize(createInstance(), source);
+ }
+
+ @Override
+ public InternalRow deserialize(InternalRow reuse, DataInputView source)
throws IOException {
+ BinaryRow reuseRow = (BinaryRow) reuse;
+ int len = source.readInt();
+ byte[] bytes = new byte[len];
+ source.readFully(bytes);
+ reuseRow.pointTo(MemorySegment.wrap(bytes), 0, len);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ int length = source.readInt();
+ target.writeInt(length);
+ target.write(source, length);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InternalRowTypeSerializer that = (InternalRowTypeSerializer) o;
+ return Objects.equals(internalRowSerializer,
that.internalRowSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(internalRowSerializer);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BinaryRowTypeSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BinaryRowTypeSerializerTest.java
deleted file mode 100644
index 3439c6d6e..000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BinaryRowTypeSerializerTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.data.BinaryRow;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.Random;
-
-import static org.apache.paimon.io.DataFileTestUtils.row;
-
-/** Test for {@link BinaryRowTypeSerializer}. */
-public class BinaryRowTypeSerializerTest extends SerializerTestBase<BinaryRow>
{
-
- @Override
- protected TypeSerializer<BinaryRow> createSerializer() {
- return new BinaryRowTypeSerializer(2);
- }
-
- @Override
- protected int getLength() {
- return -1;
- }
-
- @Override
- protected Class<BinaryRow> getTypeClass() {
- return BinaryRow.class;
- }
-
- @Override
- protected BinaryRow[] getTestData() {
- Random rnd = new Random();
- return new BinaryRow[] {
- row(1, 1),
- row(2, 2),
- row(rnd.nextInt(), rnd.nextInt()),
- row(rnd.nextInt(), rnd.nextInt())
- };
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
index 0005f5e45..a4e2424f0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
@@ -39,6 +39,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
@@ -50,6 +51,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
/** Order Rewrite Action tests for {@link SortCompactAction}. */
public class OrderRewriteActionITCase extends ActionITCaseBase {
@@ -68,6 +70,74 @@ public class OrderRewriteActionITCase extends
ActionITCaseBase {
commit(commitMessages);
}
+ @Test
+ public void testOrderBy() throws Exception {
+ prepareData(300, 1);
+ Assertions.assertThatCode(
+ () ->
+ order(
+ Arrays.asList(
+ "f0", "f1", "f2", "f3", "f4",
"f5", "f6", "f7",
+ "f8", "f9", "f10", "f11",
"f12", "f13", "f14",
+ "f15")))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testOrderResult() throws Exception {
+ prepareData(300, 2);
+ Assertions.assertThatCode(() -> order(Arrays.asList("f1", "f2")))
+ .doesNotThrowAnyException();
+
+ List<ManifestEntry> files =
+ ((AppendOnlyFileStoreTable)
getTable()).store().newScan().plan().files();
+
+ ManifestEntry entry = files.get(0);
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(entry.partition())
+ .withBucket(entry.bucket())
+ .withDataFiles(Collections.singletonList(entry.file()))
+ .build();
+
+ final AtomicInteger i = new AtomicInteger(Integer.MIN_VALUE);
+ getTable()
+ .newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ a -> {
+ Integer current = a.getInt(1);
+
Assertions.assertThat(current).isGreaterThanOrEqualTo(i.get());
+ i.set(current);
+ });
+
+ Assertions.assertThatCode(() -> order(Arrays.asList("f2", "f1")))
+ .doesNotThrowAnyException();
+
+ files = ((AppendOnlyFileStoreTable)
getTable()).store().newScan().plan().files();
+
+ entry = files.get(0);
+ dataSplit =
+ DataSplit.builder()
+ .withPartition(entry.partition())
+ .withBucket(entry.bucket())
+ .withDataFiles(Collections.singletonList(entry.file()))
+ .build();
+
+ i.set(Integer.MIN_VALUE);
+ getTable()
+ .newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ a -> {
+ Integer current = a.getInt(2);
+
Assertions.assertThat(current).isGreaterThanOrEqualTo(i.get());
+ i.set(current);
+ });
+ }
+
@Test
public void testAllBasicTypeWorksWithZorder() throws Exception {
prepareData(300, 1);
@@ -84,7 +154,7 @@ public class OrderRewriteActionITCase extends
ActionITCaseBase {
@Test
public void testZorderActionWorks() throws Exception {
- prepareData(300, 30);
+ prepareData(300, 2);
PredicateBuilder predicateBuilder = new
PredicateBuilder(getTable().rowType());
Predicate predicate = predicateBuilder.between(1, 100, 200);
@@ -113,6 +183,39 @@ public class OrderRewriteActionITCase extends
ActionITCaseBase {
Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
}
+ @Test
+ public void testCompareZorderAndOrder() throws Exception {
+ prepareData(300, 10);
+ zorder(Arrays.asList("f2", "f1"));
+
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(getTable().rowType());
+ Predicate predicate = predicateBuilder.between(1, 10, 20);
+
+ List<ManifestEntry> filesZorder =
+ ((AppendOnlyFileStoreTable)
getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilterZorder =
+ ((AppendOnlyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withFilter(predicate)
+ .plan()
+ .files();
+
+ order(Arrays.asList("f2", "f1"));
+ List<ManifestEntry> filesOrder =
+ ((AppendOnlyFileStoreTable)
getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilterOrder =
+ ((AppendOnlyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withFilter(predicate)
+ .plan()
+ .files();
+
+ Assertions.assertThat(filesFilterZorder.size() / (double)
filesZorder.size())
+ .isLessThan(filesFilterOrder.size() / (double)
filesOrder.size());
+ }
+
private void zorder(List<String> columns) throws Exception {
SortCompactAction sortCompactAction =
new SortCompactAction(
@@ -125,6 +228,18 @@ public class OrderRewriteActionITCase extends
ActionITCaseBase {
sortCompactAction.run();
}
+ private void order(List<String> columns) throws Exception {
+ SortCompactAction sortCompactAction =
+ new SortCompactAction(
+ new Path(path.toUri()).toUri().toString(),
+ "my_db",
+ "Orders1",
+ Collections.emptyMap());
+ sortCompactAction.withOrderStrategy("order");
+ sortCompactAction.withOrderColumns(columns);
+ sortCompactAction.run();
+ }
+
public Catalog getCatalog() {
if (catalog == null) {
Options options = new Options();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
new file mode 100644
index 000000000..c9a8c3e99
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+
+/** Tests for {@link InternalRowTypeSerializer}. */
+public class InternalRowSerializerTest {
+
+ private static final Random RANDOM = new Random();
+ private static final RowType rowType =
+ RowType.builder()
+ .field("a", DataTypes.STRING())
+ .field("b", DataTypes.INT())
+ .field("c", DataTypes.BIGINT())
+ .build();
+
+ @Test
+ public void testSerializeAndDeserilize() throws Exception {
+ DataOutputSerializer dataOutputSerializer = new
DataOutputSerializer(100);
+ InternalRowTypeSerializer internalRowTypeSerializer =
+ new
InternalRowTypeSerializer(rowType.getFieldTypes().toArray(new DataType[0]));
+
+ InternalRow row = GenericRow.of(randomString(), RANDOM.nextInt(),
RANDOM.nextLong());
+ internalRowTypeSerializer.serialize(row, dataOutputSerializer);
+ InternalRow row1 =
+ internalRowTypeSerializer.deserialize(
+ new
DataInputDeserializer(dataOutputSerializer.wrapAsByteBuffer()));
+
+ Assertions.assertThat(row.getString(0)).isEqualTo(row1.getString(0));
+ Assertions.assertThat(row.getInt(1)).isEqualTo(row1.getInt(1));
+ Assertions.assertThat(row.getLong(2)).isEqualTo(row1.getLong(2));
+ }
+
+ @Test
+ public void testEqual() {
+ InternalRowTypeSerializer internalRowTypeSerializer =
+ new
InternalRowTypeSerializer(rowType.getFieldTypes().toArray(new DataType[0]));
+
+ Assertions.assertThat(internalRowTypeSerializer)
+ .isEqualTo(internalRowTypeSerializer.duplicate());
+ }
+
+ @Test
+ public void testCopyFromView() {
+ InternalRowTypeSerializer internalRowTypeSerializer =
+ new
InternalRowTypeSerializer(rowType.getFieldTypes().toArray(new DataType[0]));
+
+ InternalRow row = GenericRow.of(randomString(), RANDOM.nextInt(),
RANDOM.nextLong());
+ InternalRow row1 = internalRowTypeSerializer.copy(row);
+
+ Assertions.assertThat(row.getString(0)).isEqualTo(row1.getString(0));
+ Assertions.assertThat(row.getInt(1)).isEqualTo(row1.getInt(1));
+ Assertions.assertThat(row.getLong(2)).isEqualTo(row1.getLong(2));
+ }
+
+ private BinaryString randomString() {
+ int length = RANDOM.nextInt(100);
+ byte[] buffer = new byte[length];
+ for (int i = 0; i < length; i += 1) {
+ buffer[i] = (byte) ('A' + RANDOM.nextInt(26));
+ }
+ return BinaryString.fromBytes(buffer);
+ }
+}