This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a021db68a8545b4183e935beccd9b0d62329ee67
Author: Wencong Liu <[email protected]>
AuthorDate: Tue Mar 12 14:46:04 2024 +0800

    [FLINK-34543][datastream] Introduce the SortPartition API on 
PartitionWindowedStream
---
 .../generated/execution_configuration.html         |  12 +
 .../flink/configuration/ExecutionOptions.java      |  18 ++
 .../datastream/KeyedPartitionWindowedStream.java   |  82 +++++
 .../NonKeyedPartitionWindowedStream.java           |  76 +++++
 .../api/datastream/PartitionWindowedStream.java    |  63 ++++
 .../FixedLengthByteKeyAndValueComparator.java      | 180 +++++++++++
 .../sortpartition/KeyAndValueSerializer.java       | 175 ++++++++++
 .../sortpartition/KeyedSortPartitionOperator.java  | 355 +++++++++++++++++++++
 .../sortpartition/SortPartitionOperator.java       | 286 +++++++++++++++++
 .../VariableLengthByteKeyAndValueComparator.java   | 189 +++++++++++
 .../OneInputTransformationTranslator.java          |  12 +
 .../FixedLengthByteKeyAndValueComparatorTest.java  |  63 ++++
 .../sortpartition/KeyAndValueSerializerTest.java   |  71 +++++
 .../KeyedSortPartitionOperatorTest.java            | 187 +++++++++++
 .../SerializerComparatorTestData.java              | 101 ++++++
 .../sortpartition/SortPartitionOperatorTest.java   | 168 ++++++++++
 ...ariableLengthByteKeyAndValueComparatorTest.java |  64 ++++
 .../TypeSerializerTestCoverageTest.java            |   5 +-
 .../KeyedPartitionWindowedStreamITCase.java        | 270 ++++++++++++++++
 .../NonKeyedPartitionWindowedStreamITCase.java     | 191 +++++++++++
 20 files changed, 2567 insertions(+), 1 deletion(-)

diff --git a/docs/layouts/shortcodes/generated/execution_configuration.html 
b/docs/layouts/shortcodes/generated/execution_configuration.html
index 5407666376d..6aa337d623a 100644
--- a/docs/layouts/shortcodes/generated/execution_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_configuration.html
@@ -38,5 +38,17 @@
             <td><p>Enum</p></td>
             <td>Runtime execution mode of DataStream programs. Among other 
things, this controls task scheduling, network shuffle behavior, and time 
semantics.<br /><br />Possible 
values:<ul><li>"STREAMING"</li><li>"BATCH"</li><li>"AUTOMATIC"</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>execution.sort-keyed-partition.memory</h5></td>
+            <td style="word-wrap: break-word;">128 mb</td>
+            <td>MemorySize</td>
+            <td>Sets the managed memory size for sort partition operator on 
KeyedPartitionWindowedStream.The memory size is only a weight hint. Thus, it 
will affect the operator's memory weight within a task, but the actual memory 
used depends on the running environment.</td>
+        </tr>
+        <tr>
+            <td><h5>execution.sort-partition.memory</h5></td>
+            <td style="word-wrap: break-word;">128 mb</td>
+            <td>MemorySize</td>
+            <td>Sets the managed memory size for sort partition operator in 
NonKeyedPartitionWindowedStream.The memory size is only a weight hint. Thus, it 
will affect the operator's memory weight within a task, but the actual memory 
used depends on the running environment.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
index 2a5f0b61736..2c32e2ad55b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
@@ -128,6 +128,24 @@ public class ExecutionOptions {
                                                             + "throughput"))
                                     .build());
 
+    public static final ConfigOption<MemorySize> SORT_PARTITION_MEMORY =
+            ConfigOptions.key("execution.sort-partition.memory")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription(
+                            "Sets the managed memory size for sort partition 
operator in NonKeyedPartitionWindowedStream."
+                                    + "The memory size is only a weight hint. 
Thus, it will affect the operator's memory weight within a "
+                                    + "task, but the actual memory used 
depends on the running environment.");
+
+    public static final ConfigOption<MemorySize> SORT_KEYED_PARTITION_MEMORY =
+            ConfigOptions.key("execution.sort-keyed-partition.memory")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription(
+                            "Sets the managed memory size for sort partition 
operator on KeyedPartitionWindowedStream."
+                                    + "The memory size is only a weight hint. 
Thus, it will affect the operator's memory weight within a "
+                                    + "task, but the actual memory used 
depends on the running environment.");
+
     @Documentation.ExcludeFromDocumentation(
             "This is an expert option, that we do not want to expose in the 
documentation")
     public static final ConfigOption<Boolean> SORT_INPUTS =
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
index 5ccc05826bf..6734fbec0db 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
@@ -22,14 +22,20 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import 
org.apache.flink.streaming.api.operators.sortpartition.KeyedSortPartitionOperator;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.util.Collector;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -90,4 +96,80 @@ public class KeyedPartitionWindowedStream<T, KEY> implements 
PartitionWindowedSt
         return input.window(GlobalWindows.createWithEndOfStreamTrigger())
                 .aggregate(aggregateFunction);
     }
+
+    @Override
+    public SingleOutputStreamOperator<T> sortPartition(int field, Order order) 
{
+        checkNotNull(order, "The order must not be null.");
+        checkArgument(field > 0, "The field mustn't be less than zero.");
+        TypeInformation<T> inputType = input.getType();
+        KeyedSortPartitionOperator<T, KEY> operator =
+                new KeyedSortPartitionOperator<>(inputType, field, order);
+        final String opName = "KeyedSortPartition";
+        SingleOutputStreamOperator<T> result =
+                this.input
+                        .transform(opName, inputType, operator)
+                        .setParallelism(input.getParallelism());
+        int managedMemoryWeight =
+                Math.max(
+                        1,
+                        environment
+                                .getConfiguration()
+                                
.get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY)
+                                .getMebiBytes());
+        result.getTransformation()
+                .declareManagedMemoryUseCaseAtOperatorScope(
+                        ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
+        return result;
+    }
+
+    @Override
+    public SingleOutputStreamOperator<T> sortPartition(String field, Order 
order) {
+        checkNotNull(field, "The field must not be null.");
+        checkNotNull(order, "The order must not be null.");
+        TypeInformation<T> inputType = input.getType();
+        KeyedSortPartitionOperator<T, KEY> operator =
+                new KeyedSortPartitionOperator<>(inputType, field, order);
+        final String opName = "KeyedSortPartition";
+        SingleOutputStreamOperator<T> result =
+                this.input
+                        .transform(opName, inputType, operator)
+                        .setParallelism(input.getParallelism());
+        int managedMemoryWeight =
+                Math.max(
+                        1,
+                        environment
+                                .getConfiguration()
+                                
.get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY)
+                                .getMebiBytes());
+        result.getTransformation()
+                .declareManagedMemoryUseCaseAtOperatorScope(
+                        ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
+        return result;
+    }
+
+    @Override
+    public <K> SingleOutputStreamOperator<T> sortPartition(
+            KeySelector<T, K> keySelector, Order order) {
+        checkNotNull(keySelector, "The field must not be null.");
+        checkNotNull(order, "The order must not be null.");
+        TypeInformation<T> inputType = input.getType();
+        KeyedSortPartitionOperator<T, KEY> operator =
+                new KeyedSortPartitionOperator<>(inputType, 
environment.clean(keySelector), order);
+        final String opName = "KeyedSortPartition";
+        SingleOutputStreamOperator<T> result =
+                this.input
+                        .transform(opName, inputType, operator)
+                        .setParallelism(input.getParallelism());
+        int managedMemoryWeight =
+                Math.max(
+                        1,
+                        environment
+                                .getConfiguration()
+                                
.get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY)
+                                .getMebiBytes());
+        result.getTransformation()
+                .declareManagedMemoryUseCaseAtOperatorScope(
+                        ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
+        return result;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
index 0f415aed876..6efeacc7ebc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
@@ -22,13 +22,19 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.MapPartitionOperator;
 import org.apache.flink.streaming.api.operators.PartitionAggregateOperator;
 import org.apache.flink.streaming.api.operators.PartitionReduceOperator;
+import 
org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperator;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -88,4 +94,74 @@ public class NonKeyedPartitionWindowedStream<T> implements 
PartitionWindowedStre
                         opName, resultType, new 
PartitionAggregateOperator<>(aggregateFunction))
                 .setParallelism(input.getParallelism());
     }
+
+    @Override
+    public SingleOutputStreamOperator<T> sortPartition(int field, Order order) 
{
+        checkNotNull(order, "The order must not be null.");
+        checkArgument(field > 0, "The field mustn't be less than zero.");
+        SortPartitionOperator<T> operator =
+                new SortPartitionOperator<>(input.getType(), field, order);
+        final String opName = "SortPartition";
+        SingleOutputStreamOperator<T> result =
+                input.transform(opName, input.getType(), operator)
+                        .setParallelism(input.getParallelism());
+        int managedMemoryWeight =
+                Math.max(
+                        1,
+                        environment
+                                .getConfiguration()
+                                .get(ExecutionOptions.SORT_PARTITION_MEMORY)
+                                .getMebiBytes());
+        result.getTransformation()
+                .declareManagedMemoryUseCaseAtOperatorScope(
+                        ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
+        return result;
+    }
+
+    @Override
+    public SingleOutputStreamOperator<T> sortPartition(String field, Order 
order) {
+        checkNotNull(field, "The field must not be null.");
+        checkNotNull(order, "The order must not be null.");
+        SortPartitionOperator<T> operator =
+                new SortPartitionOperator<>(input.getType(), field, order);
+        final String opName = "SortPartition";
+        SingleOutputStreamOperator<T> result =
+                input.transform(opName, input.getType(), operator)
+                        .setParallelism(input.getParallelism());
+        int managedMemoryWeight =
+                Math.max(
+                        1,
+                        environment
+                                .getConfiguration()
+                                .get(ExecutionOptions.SORT_PARTITION_MEMORY)
+                                .getMebiBytes());
+        result.getTransformation()
+                .declareManagedMemoryUseCaseAtOperatorScope(
+                        ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
+        return result;
+    }
+
+    @Override
+    public <K> SingleOutputStreamOperator<T> sortPartition(
+            KeySelector<T, K> keySelector, Order order) {
+        checkNotNull(keySelector, "The field must not be null.");
+        checkNotNull(order, "The order must not be null.");
+        SortPartitionOperator<T> operator =
+                new SortPartitionOperator<>(input.getType(), 
environment.clean(keySelector), order);
+        final String opName = "SortPartition";
+        SingleOutputStreamOperator<T> result =
+                input.transform(opName, input.getType(), operator)
+                        .setParallelism(input.getParallelism());
+        int managedMemoryWeight =
+                Math.max(
+                        1,
+                        environment
+                                .getConfiguration()
+                                .get(ExecutionOptions.SORT_PARTITION_MEMORY)
+                                .getMebiBytes());
+        result.getTransformation()
+                .declareManagedMemoryUseCaseAtOperatorScope(
+                        ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
+        return result;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
index 19ebb0a7530..7741f3c27b3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
@@ -22,6 +22,11 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.configuration.ExecutionOptions;
 
 /**
  * {@link PartitionWindowedStream} represents a data stream that collects all 
records of each
@@ -61,4 +66,62 @@ public interface PartitionWindowedStream<T> {
      */
     <ACC, R> SingleOutputStreamOperator<R> aggregate(
             AggregateFunction<T, ACC, R> aggregateFunction);
+
+    /**
+     * Sorts the records of the window on the specified field in the specified 
order. The type of
+     * records must be {@link Tuple}.
+     *
+     * <p>This operator will use managed memory for the sort.For {@link
+     * NonKeyedPartitionWindowedStream}, the managed memory size can be set by
+     * "execution.sort.partition.memory" in {@link ExecutionOptions}. For 
{@link
+     * KeyedPartitionWindowedStream}, the managed memory size can be set by
+     * "execution.sort.keyed.partition.memory" in {@link ExecutionOptions}.
+     *
+     * @param field The field index on which records is sorted.
+     * @param order The order in which records is sorted.
+     * @return The data stream with sorted records.
+     */
+    SingleOutputStreamOperator<T> sortPartition(int field, Order order);
+
+    /**
+     * Sorts the records of the window on the specified field in the specified 
order. The type of
+     * records must be Flink POJO {@link PojoTypeInfo}. A type is considered a 
Flink POJO type, if
+     * it fulfills the conditions below.
+     *
+     * <ul>
+     *   <li>It is a public class, and standalone (not a non-static inner 
class).
+     *   <li>It has a public no-argument constructor.
+     *   <li>All non-static, non-transient fields in the class (and all 
superclasses) are either
+     *       public (and non-final) or have a public getter and a setter 
method that follows the
+     *       Java beans naming conventions for getters and setters.
+     *   <li>It is a fixed-length, null-aware composite type with 
non-deterministic field order.
+     *       Every field can be null independent of the field's type.
+     * </ul>
+     *
+     * <p>This operator will use managed memory for the sort.For {@link
+     * NonKeyedPartitionWindowedStream}, the managed memory size can be set by
+     * "execution.sort.partition.memory" in {@link ExecutionOptions}. For 
{@link
+     * KeyedPartitionWindowedStream}, the managed memory size can be set by
+     * "execution.sort.keyed.partition.memory" in {@link ExecutionOptions}.
+     *
+     * @param field The field expression referring to the field on which 
records is sorted.
+     * @param order The order in which records is sorted.
+     * @return The data stream with sorted records.
+     */
+    SingleOutputStreamOperator<T> sortPartition(String field, Order order);
+
+    /**
+     * Sorts the records according to a {@link KeySelector} in the specified 
order.
+     *
+     * <p>This operator will use managed memory for the sort.For {@link
+     * NonKeyedPartitionWindowedStream}, the managed memory size can be set by
+     * "execution.sort.partition.memory" in {@link ExecutionOptions}. For 
{@link
+     * KeyedPartitionWindowedStream}, the managed memory size can be set by
+     * "execution.sort.keyed.partition.memory" in {@link ExecutionOptions}.
+     *
+     * @param keySelector The key selector to extract key from the records for 
sorting.
+     * @param order The order in which records is sorted.
+     * @return The data stream with sorted records.
+     */
+    <K> SingleOutputStreamOperator<T> sortPartition(KeySelector<T, K> 
keySelector, Order order);
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java
new file mode 100644
index 00000000000..3768b621c98
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The {@link FixedLengthByteKeyAndValueComparator} is used by {@link 
KeyedSortPartitionOperator} to
+ * compare records according to both the record key and record value. The 
length of record key must
+ * be fixed and will be initialized when the {@link 
FixedLengthByteKeyAndValueComparator} is
+ * created.
+ */
+@Internal
+public class FixedLengthByteKeyAndValueComparator<INPUT>
+        extends TypeComparator<Tuple2<byte[], INPUT>> {
+
+    private final int serializedKeyLength;
+
+    private final TypeComparator<INPUT> valueComparator;
+
+    private byte[] keyReference;
+
+    private INPUT valueReference;
+
+    FixedLengthByteKeyAndValueComparator(
+            int serializedKeyLength, TypeComparator<INPUT> valueComparator) {
+        this.serializedKeyLength = serializedKeyLength;
+        this.valueComparator = valueComparator;
+    }
+
+    @Override
+    public int hash(Tuple2<byte[], INPUT> record) {
+        return record.hashCode();
+    }
+
+    @Override
+    public void setReference(Tuple2<byte[], INPUT> toCompare) {
+        this.keyReference = toCompare.f0;
+        this.valueReference = toCompare.f1;
+    }
+
+    @Override
+    public boolean equalToReference(Tuple2<byte[], INPUT> candidate) {
+        return Arrays.equals(keyReference, candidate.f0) && 
valueReference.equals(candidate.f1);
+    }
+
+    @Override
+    public int compareToReference(TypeComparator<Tuple2<byte[], INPUT>> 
referencedComparator) {
+        byte[] otherKey =
+                ((FixedLengthByteKeyAndValueComparator<INPUT>) 
referencedComparator).keyReference;
+        INPUT otherValue =
+                ((FixedLengthByteKeyAndValueComparator<INPUT>) 
referencedComparator).valueReference;
+        int keyCmp = compareKey(otherKey, this.keyReference);
+        if (keyCmp != 0) {
+            return keyCmp;
+        }
+        return valueComparator.compare(this.valueReference, otherValue);
+    }
+
+    @Override
+    public int compare(Tuple2<byte[], INPUT> first, Tuple2<byte[], INPUT> 
second) {
+        int keyCmp = compareKey(first.f0, second.f0);
+        if (keyCmp != 0) {
+            return keyCmp;
+        }
+        return valueComparator.compare(first.f1, second.f1);
+    }
+
+    private int compareKey(byte[] first, byte[] second) {
+        for (int i = 0; i < serializedKeyLength; i++) {
+            int cmp = Byte.compare(first[i], second[i]);
+            if (cmp != 0) {
+                return cmp < 0 ? -1 : 1;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource)
+            throws IOException {
+        int minCount = serializedKeyLength;
+        while (minCount-- > 0) {
+            byte firstValue = firstSource.readByte();
+            byte secondValue = secondSource.readByte();
+            int cmp = Byte.compare(firstValue, secondValue);
+            if (cmp != 0) {
+                return cmp < 0 ? -1 : 1;
+            }
+        }
+        return valueComparator.compareSerialized(firstSource, secondSource);
+    }
+
+    @Override
+    public boolean supportsNormalizedKey() {
+        return false;
+    }
+
+    @Override
+    public int getNormalizeKeyLen() {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+
+    @Override
+    public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+        return false;
+    }
+
+    @Override
+    public void putNormalizedKey(
+            Tuple2<byte[], INPUT> record, MemorySegment target, int offset, 
int numBytes) {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+
+    @Override
+    public boolean invertNormalizedKey() {
+        return false;
+    }
+
+    @Override
+    public TypeComparator<Tuple2<byte[], INPUT>> duplicate() {
+        return new FixedLengthByteKeyAndValueComparator<>(
+                this.serializedKeyLength, this.valueComparator);
+    }
+
+    @Override
+    public int extractKeys(Object record, Object[] target, int index) {
+        target[index] = record;
+        return 1;
+    }
+
+    @Override
+    public TypeComparator<?>[] getFlatComparators() {
+        return new TypeComparator[] {this};
+    }
+
+    @Override
+    public boolean supportsSerializationWithKeyNormalization() {
+        return false;
+    }
+
+    @Override
+    public void writeWithKeyNormalization(Tuple2<byte[], INPUT> record, 
DataOutputView target) {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+
+    @Override
+    public Tuple2<byte[], INPUT> readWithKeyDenormalization(
+            Tuple2<byte[], INPUT> reuse, DataInputView source) {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializer.java
new file mode 100644
index 00000000000..2788880b7b3
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * {@link KeyAndValueSerializer} is used in {@link KeyedSortPartitionOperator} 
for serializing
+ * elements including key and record. It serializes the record in a format 
known by the {@link
+ * FixedLengthByteKeyAndValueComparator} and {@link 
VariableLengthByteKeyAndValueComparator}.
+ *
+ * <p>If the key's length is fixed and known, the format of each serialized 
element is as follows:
+ *
+ * <pre>
+ *      [key] | [record]
+ * </pre>
+ *
+ * <p>If the key's length is variable, the format of each serialized element 
is as follows:
+ *
+ * <pre>
+ *      [key's length] | [key] | [record]
+ * </pre>
+ */
+final class KeyAndValueSerializer<INPUT> extends TypeSerializer<Tuple2<byte[], 
INPUT>> {
+    private final TypeSerializer<INPUT> valueSerializer;
+
+    /**
+     * The value of serializedKeyLength will be positive if the key's length 
is fixed otherwise -1.
+     */
+    private final int serializedKeyLength;
+
+    KeyAndValueSerializer(TypeSerializer<INPUT> valueSerializer, int 
serializedKeyLength) {
+        this.valueSerializer = valueSerializer;
+        this.serializedKeyLength = serializedKeyLength;
+    }
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<Tuple2<byte[], INPUT>> duplicate() {
+        return new KeyAndValueSerializer<>(valueSerializer.duplicate(), 
this.serializedKeyLength);
+    }
+
+    @Override
+    public Tuple2<byte[], INPUT> copy(Tuple2<byte[], INPUT> from) {
+        INPUT record = from.f1;
+        return Tuple2.of(Arrays.copyOf(from.f0, from.f0.length), 
valueSerializer.copy(record));
+    }
+
+    @Override
+    public Tuple2<byte[], INPUT> createInstance() {
+        return Tuple2.of(new byte[0], valueSerializer.createInstance());
+    }
+
+    @Override
+    public Tuple2<byte[], INPUT> copy(Tuple2<byte[], INPUT> from, 
Tuple2<byte[], INPUT> reuse) {
+        reuse.f0 = Arrays.copyOf(from.f0, from.f0.length);
+        INPUT fromRecord = from.f1;
+        reuse.f1 = valueSerializer.copy(fromRecord, reuse.f1);
+        return reuse;
+    }
+
+    @Override
+    public int getLength() {
+        if (valueSerializer.getLength() < 0 || serializedKeyLength < 0) {
+            return -1;
+        }
+        return valueSerializer.getLength() + serializedKeyLength;
+    }
+
+    @Override
+    public void serialize(Tuple2<byte[], INPUT> record, DataOutputView target) 
throws IOException {
+        if (serializedKeyLength < 0) {
+            target.writeInt(record.f0.length);
+        }
+        target.write(record.f0);
+        INPUT toSerialize = record.f1;
+        valueSerializer.serialize(toSerialize, target);
+    }
+
+    @Override
+    public Tuple2<byte[], INPUT> deserialize(DataInputView source) throws 
IOException {
+        final int length = getKeyLength(source);
+        byte[] bytes = new byte[length];
+        source.read(bytes);
+        INPUT value = valueSerializer.deserialize(source);
+        return Tuple2.of(bytes, value);
+    }
+
+    @Override
+    public Tuple2<byte[], INPUT> deserialize(Tuple2<byte[], INPUT> reuse, 
DataInputView source)
+            throws IOException {
+        final int length = getKeyLength(source);
+        byte[] bytes = new byte[length];
+        source.read(bytes);
+        INPUT value = valueSerializer.deserialize(source);
+        reuse.f0 = bytes;
+        reuse.f1 = value;
+        return reuse;
+    }
+
+    private int getKeyLength(DataInputView source) throws IOException {
+        final int length;
+        if (serializedKeyLength < 0) {
+            length = source.readInt();
+        } else {
+            length = serializedKeyLength;
+        }
+        return length;
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        final int length;
+        if (serializedKeyLength < 0) {
+            length = source.readInt();
+            target.writeInt(length);
+        } else {
+            length = serializedKeyLength;
+        }
+        for (int i = 0; i < length; i++) {
+            target.writeByte(source.readByte());
+        }
+        valueSerializer.copy(source, target);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KeyAndValueSerializer<?> that = (KeyAndValueSerializer<?>) o;
+        return Objects.equals(valueSerializer, that.valueSerializer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(valueSerializer);
+    }
+
+    @Override
+    public TypeSerializerSnapshot<Tuple2<byte[], INPUT>> 
snapshotConfiguration() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperator.java
new file mode 100644
index 00000000000..8c45a54ed5a
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperator.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * The {@link KeyedSortPartitionOperator} sorts records of a partition on 
{@link KeyedStream}. It
+ * ensures that all records with the same key are sorted in a user-defined 
order.
+ *
+ * <p>To sort the record key first and then record at the same time, both the 
record key and the
+ * record will be written to {@link ExternalSorter} directly. However, if the 
record is sorted
+ * according to the selected key by {@link KeySelector}, the selected sort key 
should also be
+ * written with the record key and the record to {@link ExternalSorter} to 
avoid repeated key
+ * selections.
+ *
+ * @param <INPUT> The type of input record.
+ * @param <KEY> The type of record key, which has already been defined in 
{@link KeyedStream}.
+ */
+@Internal
+public class KeyedSortPartitionOperator<INPUT, KEY> extends 
AbstractStreamOperator<INPUT>
+        implements OneInputStreamOperator<INPUT, INPUT>, BoundedOneInput {
+
+    /** The type information of input records. */
+    protected final TypeInformation<INPUT> inputType;
+
+    /** The selector to create the sort key for records, which will be null if 
it's not used. */
+    protected final KeySelector<INPUT, ?> sortFieldSelector;
+
+    /** The order to sort records. */
+    private final Order sortOrder;
+
+    /**
+     * The string field to indicate the sort key for records with tuple or 
pojo type, which will be
+     * null if it's not used.
+     */
+    private final String stringSortField;
+
+    /**
+     * The int field to indicate the sort key for records with tuple type, 
which will be -1 if it's
+     * not used.
+     */
+    private final int positionSortField;
+
+    /**
+     * The sorter to sort both key and record if the record is not sorted by 
{@link KeySelector}.
+     */
+    private PushSorter<Tuple2<byte[], INPUT>> recordSorter = null;
+
+    /** The sorter to sort both key and record if the record is sorted by 
{@link KeySelector}. */
+    private PushSorter<Tuple2<byte[], Tuple2<?, INPUT>>> 
recordSorterForSelector = null;
+
+    private TypeSerializer<KEY> recordKeySerializer;
+
+    /** A buffer to save the serialized record key. */
+    private DataOutputSerializer dataOutputSerializer;
+
+    public KeyedSortPartitionOperator(
+            TypeInformation<INPUT> inputType, int positionSortField, Order 
sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(positionSortField);
+        this.positionSortField = positionSortField;
+        this.stringSortField = null;
+        this.sortFieldSelector = null;
+        this.sortOrder = sortOrder;
+    }
+
+    public KeyedSortPartitionOperator(
+            TypeInformation<INPUT> inputType, String stringSortField, Order 
sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(stringSortField);
+        this.positionSortField = -1;
+        this.stringSortField = stringSortField;
+        this.sortFieldSelector = null;
+        this.sortOrder = sortOrder;
+    }
+
+    public <K> KeyedSortPartitionOperator(
+            TypeInformation<INPUT> inputType,
+            KeySelector<INPUT, K> sortFieldSelector,
+            Order sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(sortFieldSelector);
+        this.positionSortField = -1;
+        this.stringSortField = null;
+        this.sortFieldSelector = sortFieldSelector;
+        this.sortOrder = sortOrder;
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<INPUT>> output) {
+        super.setup(containingTask, config, output);
+        ClassLoader userCodeClassLoader = 
containingTask.getUserCodeClassLoader();
+        ExecutionConfig executionConfig = 
containingTask.getEnvironment().getExecutionConfig();
+        recordKeySerializer = 
config.getStateKeySerializer(userCodeClassLoader);
+        int keyLength = recordKeySerializer.getLength();
+        createDataOutputSerializer(keyLength);
+        if (sortFieldSelector != null) {
+            TypeInformation<Tuple2<?, INPUT>> valueType =
+                    Types.TUPLE(
+                            
TypeExtractor.getKeySelectorTypes(sortFieldSelector, inputType),
+                            inputType);
+            KeyAndValueSerializer<Tuple2<?, INPUT>> valueSerializer =
+                    new KeyAndValueSerializer<>(
+                            valueType.createSerializer(getExecutionConfig()), 
keyLength);
+            TypeComparator<Tuple2<byte[], Tuple2<?, INPUT>>> 
sortTypeComparator;
+            if (keyLength > 0) {
+                sortTypeComparator =
+                        new FixedLengthByteKeyAndValueComparator<>(
+                                keyLength,
+                                ((CompositeType<Tuple2<?, INPUT>>) valueType)
+                                        .createComparator(
+                                                getSortFieldIndex(),
+                                                getSortOrderIndicator(),
+                                                0,
+                                                executionConfig));
+            } else {
+                sortTypeComparator =
+                        new VariableLengthByteKeyAndValueComparator<>(
+                                ((CompositeType<Tuple2<?, INPUT>>) valueType)
+                                        .createComparator(
+                                                getSortFieldIndex(),
+                                                getSortOrderIndicator(),
+                                                0,
+                                                executionConfig));
+            }
+            recordSorterForSelector =
+                    getSorter(valueSerializer, sortTypeComparator, 
containingTask);
+        } else {
+            KeyAndValueSerializer<INPUT> valueSerializer =
+                    new KeyAndValueSerializer<>(
+                            inputType.createSerializer(getExecutionConfig()), 
keyLength);
+            TypeComparator<Tuple2<byte[], INPUT>> sortTypeComparator;
+            if (keyLength > 0) {
+                sortTypeComparator =
+                        new FixedLengthByteKeyAndValueComparator<>(
+                                keyLength,
+                                ((CompositeType<INPUT>) inputType)
+                                        .createComparator(
+                                                getSortFieldIndex(),
+                                                getSortOrderIndicator(),
+                                                0,
+                                                executionConfig));
+            } else {
+                sortTypeComparator =
+                        new VariableLengthByteKeyAndValueComparator<>(
+                                ((CompositeType<INPUT>) inputType)
+                                        .createComparator(
+                                                getSortFieldIndex(),
+                                                getSortOrderIndicator(),
+                                                0,
+                                                executionConfig));
+            }
+            recordSorter = getSorter(valueSerializer, sortTypeComparator, 
containingTask);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<INPUT> element) throws Exception {
+        KEY currentKey = (KEY) getCurrentKey();
+        recordKeySerializer.serialize(currentKey, dataOutputSerializer);
+        byte[] serializedKey = dataOutputSerializer.getCopyOfBuffer();
+        dataOutputSerializer.clear();
+        if (sortFieldSelector != null) {
+            recordSorterForSelector.writeRecord(
+                    Tuple2.of(
+                            serializedKey,
+                            Tuple2.of(
+                                    
sortFieldSelector.getKey(element.getValue()),
+                                    element.getValue())));
+        } else {
+            recordSorter.writeRecord(Tuple2.of(serializedKey, 
element.getValue()));
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        TimestampedCollector<INPUT> outputCollector = new 
TimestampedCollector<>(output);
+        if (sortFieldSelector != null) {
+            recordSorterForSelector.finishReading();
+            MutableObjectIterator<Tuple2<byte[], Tuple2<?, INPUT>>> iterator =
+                    recordSorterForSelector.getIterator();
+            Tuple2<byte[], Tuple2<?, INPUT>> record = iterator.next();
+            while (record != null) {
+                outputCollector.collect(record.f1.f1);
+                record = iterator.next();
+            }
+            recordSorterForSelector.close();
+        } else {
+            recordSorter.finishReading();
+            MutableObjectIterator<Tuple2<byte[], INPUT>> iterator = 
recordSorter.getIterator();
+            Tuple2<byte[], INPUT> record = iterator.next();
+            while (record != null) {
+                outputCollector.collect(record.f1);
+                record = iterator.next();
+            }
+            recordSorter.close();
+        }
+    }
+
+    @Override
+    public OperatorAttributes getOperatorAttributes() {
+        return new 
OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build();
+    }
+
+    /**
+     * Get the sort field index for the sorted data.
+     *
+     * @return the sort field index.
+     */
+    private int[] getSortFieldIndex() {
+        int[] sortFieldIndex = new int[1];
+        if (positionSortField != -1) {
+            sortFieldIndex[0] =
+                    new Keys.ExpressionKeys<>(positionSortField, inputType)
+                            .computeLogicalKeyPositions()[0];
+        } else if (stringSortField != null) {
+            sortFieldIndex[0] =
+                    new Keys.ExpressionKeys<>(stringSortField, inputType)
+                            .computeLogicalKeyPositions()[0];
+        }
+        return sortFieldIndex;
+    }
+
+    /**
+     * Get the indicator for the sort order.
+     *
+     * @return sort order indicator.
+     */
+    private boolean[] getSortOrderIndicator() {
+        boolean[] sortOrderIndicator = new boolean[1];
+        sortOrderIndicator[0] = this.sortOrder == Order.ASCENDING;
+        return sortOrderIndicator;
+    }
+
+    private void ensureFieldSortable(int field) throws InvalidProgramException 
{
+        if (!Keys.ExpressionKeys.isSortKey(field, inputType)) {
+            throw new InvalidProgramException(
+                    "The field " + field + " of input type " + inputType + " 
is not sortable.");
+        }
+    }
+
+    private void ensureFieldSortable(String field) throws 
InvalidProgramException {
+        if (!Keys.ExpressionKeys.isSortKey(field, inputType)) {
+            throw new InvalidProgramException(
+                    "The field " + field + " of input type " + inputType + " 
is not sortable.");
+        }
+    }
+
+    private <K> void ensureFieldSortable(KeySelector<INPUT, K> keySelector) {
+        TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, inputType);
+        Keys.SelectorFunctionKeys<INPUT, K> sortKey =
+                new Keys.SelectorFunctionKeys<>(keySelector, inputType, 
keyType);
+        if (!sortKey.getKeyType().isSortKeyType()) {
+            throw new InvalidProgramException("The key type " + keyType + " is 
not sortable.");
+        }
+    }
+
+    /**
+     * Create the dataOutputSerializer to save the serialized record key as a 
buffer.
+     *
+     * @param keyLength the length of record key. The key length will be 
variable if the value is
+     *     -1.
+     */
+    private void createDataOutputSerializer(int keyLength) {
+        if (keyLength > 0) {
+            dataOutputSerializer = new DataOutputSerializer(keyLength);
+        } else {
+            // The initial buffer size is set to 64. The buffer will expand 
size if it's needed.
+            dataOutputSerializer = new DataOutputSerializer(64);
+        }
+    }
+
+    private <TYPE> PushSorter<TYPE> getSorter(
+            TypeSerializer<TYPE> typeSerializer,
+            TypeComparator<TYPE> typeComparator,
+            StreamTask<?, ?> streamTask) {
+        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
+        Configuration jobConfiguration = 
streamTask.getEnvironment().getJobConfiguration();
+        double managedMemoryFraction =
+                config.getManagedMemoryFractionOperatorUseCaseOfSlot(
+                        ManagedMemoryUseCase.OPERATOR,
+                        streamTask.getEnvironment().getJobConfiguration(),
+                        streamTask.getEnvironment().getTaskConfiguration(),
+                        userCodeClassLoader);
+        try {
+            return ExternalSorter.newBuilder(
+                            streamTask.getEnvironment().getMemoryManager(),
+                            streamTask,
+                            typeSerializer,
+                            typeComparator,
+                            streamTask.getExecutionConfig())
+                    .memoryFraction(managedMemoryFraction)
+                    .enableSpilling(
+                            streamTask.getEnvironment().getIOManager(),
+                            
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                    
.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+                    
.objectReuse(streamTask.getExecutionConfig().isObjectReuseEnabled())
+                    
.largeRecords(jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
+                    .build();
+        } catch (MemoryAllocationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java
new file mode 100644
index 00000000000..11b41cc48ba
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * The {@link SortPartitionOperator} sorts records of a partition on non-keyed 
data stream. It
+ * ensures that all records within the same task are sorted in a user-defined 
order.
+ *
+ * <p>To sort the records, the record will be written to {@link 
ExternalSorter} directly. However,
+ * if the record is sorted according to the selected key by {@link 
KeySelector}, the selected key
+ * should also be written with the record to {@link ExternalSorter} to avoid 
repeated key
+ * selections.
+ *
+ * @param <INPUT> The type of input record.
+ */
+@Internal
+public class SortPartitionOperator<INPUT> extends AbstractStreamOperator<INPUT>
+        implements OneInputStreamOperator<INPUT, INPUT>, BoundedOneInput {
+
+    /** The type information of input records. */
+    protected final TypeInformation<INPUT> inputType;
+
+    /** The selector to create the sort key for records, which will be null if 
it's not used. */
+    protected final KeySelector<INPUT, ?> sortFieldSelector;
+
+    /** The order to sort records. */
+    private final Order sortOrder;
+
+    /**
+     * The string field to indicate the sort key for records with tuple or 
pojo type, which will be
+     * null if it's not used.
+     */
+    private final String stringSortField;
+
+    /**
+     * The int field to indicate the sort key for records with tuple type, 
which will be -1 if it's
+     * not used.
+     */
+    private final int positionSortField;
+
+    /** The sorter to sort record if the record is not sorted by {@link 
KeySelector}. */
+    private PushSorter<INPUT> recordSorter = null;
+
+    /** The sorter to sort record if the record is sorted by {@link 
KeySelector}. */
+    private PushSorter<Tuple2<?, INPUT>> recordSorterForKeySelector = null;
+
+    public SortPartitionOperator(
+            TypeInformation<INPUT> inputType, int positionSortField, Order 
sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(positionSortField);
+        this.positionSortField = positionSortField;
+        this.stringSortField = null;
+        this.sortFieldSelector = null;
+        this.sortOrder = sortOrder;
+    }
+
+    public SortPartitionOperator(
+            TypeInformation<INPUT> inputType, String stringSortField, Order 
sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(stringSortField);
+        this.positionSortField = -1;
+        this.stringSortField = stringSortField;
+        this.sortFieldSelector = null;
+        this.sortOrder = sortOrder;
+    }
+
+    public <K> SortPartitionOperator(
+            TypeInformation<INPUT> inputType,
+            KeySelector<INPUT, K> sortFieldSelector,
+            Order sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(sortFieldSelector);
+        this.positionSortField = -1;
+        this.stringSortField = null;
+        this.sortFieldSelector = sortFieldSelector;
+        this.sortOrder = sortOrder;
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<INPUT>> output) {
+        super.setup(containingTask, config, output);
+        ExecutionConfig executionConfig = 
containingTask.getEnvironment().getExecutionConfig();
+        if (sortFieldSelector != null) {
+            TypeInformation<Tuple2<?, INPUT>> sortTypeInfo =
+                    Types.TUPLE(
+                            
TypeExtractor.getKeySelectorTypes(sortFieldSelector, inputType),
+                            inputType);
+            recordSorterForKeySelector =
+                    getSorter(
+                            sortTypeInfo.createSerializer(executionConfig),
+                            ((CompositeType<Tuple2<?, INPUT>>) sortTypeInfo)
+                                    .createComparator(
+                                            getSortFieldIndex(),
+                                            getSortOrderIndicator(),
+                                            0,
+                                            executionConfig),
+                            containingTask);
+        } else {
+            recordSorter =
+                    getSorter(
+                            inputType.createSerializer(executionConfig),
+                            ((CompositeType<INPUT>) inputType)
+                                    .createComparator(
+                                            getSortFieldIndex(),
+                                            getSortOrderIndicator(),
+                                            0,
+                                            executionConfig),
+                            containingTask);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<INPUT> element) throws Exception {
+        if (sortFieldSelector != null) {
+            recordSorterForKeySelector.writeRecord(
+                    Tuple2.of(sortFieldSelector.getKey(element.getValue()), 
element.getValue()));
+        } else {
+            recordSorter.writeRecord(element.getValue());
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        TimestampedCollector<INPUT> outputCollector = new 
TimestampedCollector<>(output);
+        if (sortFieldSelector != null) {
+            recordSorterForKeySelector.finishReading();
+            MutableObjectIterator<Tuple2<?, INPUT>> dataIterator =
+                    recordSorterForKeySelector.getIterator();
+            Tuple2<?, INPUT> record = dataIterator.next();
+            while (record != null) {
+                outputCollector.collect(record.f1);
+                record = dataIterator.next();
+            }
+            recordSorterForKeySelector.close();
+        } else {
+            recordSorter.finishReading();
+            MutableObjectIterator<INPUT> dataIterator = 
recordSorter.getIterator();
+            INPUT record = dataIterator.next();
+            while (record != null) {
+                outputCollector.collect(record);
+                record = dataIterator.next();
+            }
+            recordSorter.close();
+        }
+    }
+
+    @Override
+    public OperatorAttributes getOperatorAttributes() {
+        return new 
OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build();
+    }
+
+    /**
+     * Get the sort field index for the sorted data.
+     *
+     * @return the sort field index.
+     */
+    private int[] getSortFieldIndex() {
+        int[] sortFieldIndex = new int[1];
+        if (positionSortField != -1) {
+            sortFieldIndex[0] =
+                    new Keys.ExpressionKeys<>(positionSortField, inputType)
+                            .computeLogicalKeyPositions()[0];
+        } else if (stringSortField != null) {
+            sortFieldIndex[0] =
+                    new Keys.ExpressionKeys<>(stringSortField, inputType)
+                            .computeLogicalKeyPositions()[0];
+        }
+        return sortFieldIndex;
+    }
+
+    /**
+     * Get the indicator for the sort order.
+     *
+     * @return sort order indicator.
+     */
+    private boolean[] getSortOrderIndicator() {
+        boolean[] sortOrderIndicator = new boolean[1];
+        sortOrderIndicator[0] = this.sortOrder == Order.ASCENDING;
+        return sortOrderIndicator;
+    }
+
+    private void ensureFieldSortable(int field) throws InvalidProgramException 
{
+        if (!Keys.ExpressionKeys.isSortKey(field, inputType)) {
+            throw new InvalidProgramException(
+                    "The field " + field + " of input type " + inputType + " 
is not sortable.");
+        }
+    }
+
+    private void ensureFieldSortable(String field) throws 
InvalidProgramException {
+        if (!Keys.ExpressionKeys.isSortKey(field, inputType)) {
+            throw new InvalidProgramException(
+                    "The field " + field + " of input type " + inputType + " 
is not sortable.");
+        }
+    }
+
+    private <K> void ensureFieldSortable(KeySelector<INPUT, K> keySelector) {
+        TypeInformation<K> keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, inputType);
+        Keys.SelectorFunctionKeys<INPUT, K> sortKey =
+                new Keys.SelectorFunctionKeys<>(keySelector, inputType, 
keyType);
+        if (!sortKey.getKeyType().isSortKeyType()) {
+            throw new InvalidProgramException("The key type " + keyType + " is 
not sortable.");
+        }
+    }
+
+    private <TYPE> PushSorter<TYPE> getSorter(
+            TypeSerializer<TYPE> typeSerializer,
+            TypeComparator<TYPE> typeComparator,
+            StreamTask<?, ?> streamTask) {
+        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
+        Configuration jobConfiguration = 
streamTask.getEnvironment().getJobConfiguration();
+        double managedMemoryFraction =
+                config.getManagedMemoryFractionOperatorUseCaseOfSlot(
+                        ManagedMemoryUseCase.OPERATOR,
+                        streamTask.getEnvironment().getJobConfiguration(),
+                        streamTask.getEnvironment().getTaskConfiguration(),
+                        userCodeClassLoader);
+        try {
+            return ExternalSorter.newBuilder(
+                            streamTask.getEnvironment().getMemoryManager(),
+                            streamTask,
+                            typeSerializer,
+                            typeComparator,
+                            streamTask.getExecutionConfig())
+                    .memoryFraction(managedMemoryFraction)
+                    .enableSpilling(
+                            streamTask.getEnvironment().getIOManager(),
+                            
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                    
.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+                    
.objectReuse(streamTask.getExecutionConfig().isObjectReuseEnabled())
+                    
.largeRecords(jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
+                    .build();
+        } catch (MemoryAllocationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparator.java
new file mode 100644
index 00000000000..98c648f6f40
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The {@link VariableLengthByteKeyAndValueComparator} is used by {@link 
KeyedSortPartitionOperator}
+ * to compare records according to both the record key and record value. The 
length of record key
+ * must be variable and will be read from {@link DataInputView}.
+ */
+@Internal
+public class VariableLengthByteKeyAndValueComparator<INPUT>
+        extends TypeComparator<Tuple2<byte[], INPUT>> {
+
+    private final TypeComparator<INPUT> valueComparator;
+
+    private byte[] keyReference;
+
+    private INPUT valueReference;
+
+    public VariableLengthByteKeyAndValueComparator(TypeComparator<INPUT> 
valueComparator) {
+        this.valueComparator = valueComparator;
+    }
+
+    @Override
+    public int hash(Tuple2<byte[], INPUT> record) {
+        return record.hashCode();
+    }
+
+    @Override
+    public void setReference(Tuple2<byte[], INPUT> toCompare) {
+        this.keyReference = toCompare.f0;
+        this.valueReference = toCompare.f1;
+    }
+
+    @Override
+    public boolean equalToReference(Tuple2<byte[], INPUT> candidate) {
+        return Arrays.equals(keyReference, candidate.f0) && 
valueReference.equals(candidate.f1);
+    }
+
+    @Override
+    public int compareToReference(TypeComparator<Tuple2<byte[], INPUT>> 
referencedComparator) {
+        byte[] otherKey =
+                ((VariableLengthByteKeyAndValueComparator<INPUT>) 
referencedComparator)
+                        .keyReference;
+        INPUT otherValue =
+                ((VariableLengthByteKeyAndValueComparator<INPUT>) 
referencedComparator)
+                        .valueReference;
+        int keyCmp = compareKey(otherKey, this.keyReference);
+        if (keyCmp != 0) {
+            return keyCmp;
+        }
+        return valueComparator.compare(this.valueReference, otherValue);
+    }
+
+    @Override
+    public int compare(Tuple2<byte[], INPUT> first, Tuple2<byte[], INPUT> 
second) {
+        int keyCmp = compareKey(first.f0, second.f0);
+        if (keyCmp != 0) {
+            return keyCmp;
+        }
+        return valueComparator.compare(first.f1, second.f1);
+    }
+
+    private int compareKey(byte[] first, byte[] second) {
+        int firstLength = first.length;
+        int secondLength = second.length;
+        int minLength = Math.min(firstLength, secondLength);
+        for (int i = 0; i < minLength; i++) {
+            int cmp = Byte.compare(first[i], second[i]);
+
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+
+        return Integer.compare(firstLength, secondLength);
+    }
+
+    @Override
+    public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource)
+            throws IOException {
+        int firstLength = firstSource.readInt();
+        int secondLength = secondSource.readInt();
+        int minLength = Math.min(firstLength, secondLength);
+        while (minLength-- > 0) {
+            byte firstValue = firstSource.readByte();
+            byte secondValue = secondSource.readByte();
+
+            int cmp = Byte.compare(firstValue, secondValue);
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        int lengthCompare = Integer.compare(firstLength, secondLength);
+        if (lengthCompare != 0) {
+            return lengthCompare;
+        } else {
+            return valueComparator.compareSerialized(firstSource, 
secondSource);
+        }
+    }
+
+    @Override
+    public boolean supportsNormalizedKey() {
+        return false;
+    }
+
+    @Override
+    public int getNormalizeKeyLen() {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+
+    @Override
+    public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+        return false;
+    }
+
+    @Override
+    public void putNormalizedKey(
+            Tuple2<byte[], INPUT> record, MemorySegment target, int offset, 
int numBytes) {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+
+    @Override
+    public boolean invertNormalizedKey() {
+        return false;
+    }
+
+    @Override
+    public TypeComparator<Tuple2<byte[], INPUT>> duplicate() {
+        return new 
VariableLengthByteKeyAndValueComparator<>(this.valueComparator);
+    }
+
+    @Override
+    public int extractKeys(Object record, Object[] target, int index) {
+        target[index] = record;
+        return 1;
+    }
+
+    @Override
+    public TypeComparator<?>[] getFlatComparators() {
+        return new TypeComparator[] {this};
+    }
+
+    @Override
+    public boolean supportsSerializationWithKeyNormalization() {
+        return false;
+    }
+
+    @Override
+    public void writeWithKeyNormalization(Tuple2<byte[], INPUT> record, 
DataOutputView target) {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+
+    @Override
+    public Tuple2<byte[], INPUT> readWithKeyDenormalization(
+            Tuple2<byte[], INPUT> reuse, DataInputView source) {
+        throw new UnsupportedOperationException(
+                "Not supported as the data containing both key and value 
cannot generate normalized key.");
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
index beecf931024..804d1f7c09a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import 
org.apache.flink.streaming.api.operators.sortpartition.KeyedSortPartitionOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 
 import java.util.Collection;
@@ -78,6 +81,15 @@ public final class OneInputTransformationTranslator<IN, OUT>
             final OneInputTransformation<IN, OUT> transformation, final 
Context context) {
         KeySelector<IN, ?> keySelector = transformation.getStateKeySelector();
         if (keySelector != null) {
+            // KeyedSortPartitionOperator doesn't need sorted input because it 
sorts data
+            // internally.
+            StreamOperatorFactory<OUT> operatorFactory = 
transformation.getOperatorFactory();
+            if (operatorFactory instanceof SimpleOperatorFactory
+                    && operatorFactory.getStreamOperatorClass(
+                                    
Thread.currentThread().getContextClassLoader())
+                            == KeyedSortPartitionOperator.class) {
+                return;
+            }
             BatchExecutionUtils.applyBatchExecutionSettings(
                     transformation.getId(), context, 
StreamConfig.InputRequirement.SORTED);
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparatorTest.java
new file mode 100644
index 00000000000..a7065a1e2ad
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FixedLengthByteKeyAndValueComparator} in {@link 
KeyedSortPartitionOperator}. */
+class FixedLengthByteKeyAndValueComparatorTest extends 
ComparatorTestBase<Tuple2<byte[], Integer>> {
+
+    @Override
+    protected Order[] getTestedOrder() {
+        return new Order[] {Order.ASCENDING};
+    }
+
+    @Override
+    protected TypeComparator<Tuple2<byte[], Integer>> createComparator(boolean 
ascending) {
+        return new FixedLengthByteKeyAndValueComparator<>(
+                new IntSerializer().getLength(),
+                BasicTypeInfo.INT_TYPE_INFO.createComparator(true, null));
+    }
+
+    @Override
+    protected TypeSerializer<Tuple2<byte[], Integer>> createSerializer() {
+        IntSerializer intSerializer = new IntSerializer();
+        return new KeyAndValueSerializer<>(intSerializer, 
intSerializer.getLength());
+    }
+
+    @Override
+    protected void deepEquals(
+            String message, Tuple2<byte[], Integer> should, Tuple2<byte[], 
Integer> is) {
+        assertThat(is.f0).as(message).isEqualTo(should.f0);
+        assertThat(is.f1).as(message).isEqualTo(should.f1);
+    }
+
+    @Override
+    protected Tuple2<byte[], Integer>[] getSortedTestData() {
+        return SerializerComparatorTestData.getOrderedIntTestData();
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializerTest.java
new file mode 100644
index 00000000000..879468e9f90
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link KeyAndValueSerializer} in {@link 
KeyedSortPartitionOperator}. */
+class KeyAndValueSerializerTest extends SerializerTestBase<Tuple2<byte[], 
Integer>> {
+
+    private static final int DEFAULT_KEY_LENGTH = 4;
+
+    private final IntSerializer valueSerializer = new IntSerializer();
+
+    @Override
+    protected TypeSerializer<Tuple2<byte[], Integer>> createSerializer() {
+        return new KeyAndValueSerializer<>(valueSerializer, 
DEFAULT_KEY_LENGTH);
+    }
+
+    @Override
+    protected int getLength() {
+        return DEFAULT_KEY_LENGTH + valueSerializer.getLength();
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected Class<Tuple2<byte[], Integer>> getTypeClass() {
+        return (Class<Tuple2<byte[], Integer>>) (Class) Tuple2.class;
+    }
+
+    @Override
+    protected Tuple2<byte[], Integer>[] getTestData() {
+        return SerializerComparatorTestData.getOrderedIntTestData();
+    }
+
+    @Override
+    @Test
+    public void testConfigSnapshotInstantiation() {
+        assertThatThrownBy(() -> super.testConfigSnapshotInstantiation())
+                .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Override
+    @Test
+    public void testSnapshotConfigurationAndReconfigure() {
+        assertThatThrownBy(() -> 
super.testSnapshotConfigurationAndReconfigure())
+                .isInstanceOf(UnsupportedOperationException.class);
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperatorTest.java
new file mode 100644
index 00000000000..470381ecb46
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperatorTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link KeyedSortPartitionOperator}. */
+class KeyedSortPartitionOperatorTest {
+
+    @Test
+    void testSortPartition() throws Exception {
+        // 1.Test KeyedSortPartitionOperator sorting records by position field.
+        KeyedSortPartitionOperator<Tuple2<Integer, String>, String> operator1 =
+                createSortPartitionOperatorWithPositionField();
+        OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Tuple2<Integer, String>>
+                testHarness1 = new 
OneInputStreamOperatorTestHarness<>(operator1);
+        StreamConfig streamConfig1 = testHarness1.getStreamConfig();
+        
streamConfig1.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)
 null));
+        streamConfig1.serializeAllConfigs();
+        Queue<Object> expectedOutput1 = new LinkedList<>();
+        testHarness1.open();
+        testHarness1.processElement(new StreamRecord<>(Tuple2.of(3, "3")));
+        testHarness1.processElement(new StreamRecord<>(Tuple2.of(1, "1")));
+        testHarness1.endInput();
+        testHarness1.close();
+        expectedOutput1.add(new StreamRecord<>(Tuple2.of(1, "1")));
+        expectedOutput1.add(new StreamRecord<>(Tuple2.of(3, "3")));
+        TestHarnessUtil.assertOutputEquals(
+                "The sort partition result is not correct.",
+                expectedOutput1,
+                testHarness1.getOutput());
+        // 2.Test KeyedSortPartitionOperator sorting records by string field.
+        KeyedSortPartitionOperator<TestPojo, String> operator2 =
+                createSortPartitionOperatorWithStringField();
+        OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness2 =
+                new OneInputStreamOperatorTestHarness<>(operator2);
+        StreamConfig streamConfig2 = testHarness2.getStreamConfig();
+        
streamConfig2.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)
 null));
+        streamConfig2.serializeAllConfigs();
+        Queue<Object> expectedOutput2 = new LinkedList<>();
+        testHarness2.open();
+        testHarness2.processElement(new StreamRecord<>(new TestPojo("3", 3)));
+        testHarness2.processElement(new StreamRecord<>(new TestPojo("1", 1)));
+        testHarness2.endInput();
+        testHarness2.close();
+        expectedOutput2.add(new StreamRecord<>(new 
SortPartitionOperatorTest.TestPojo("1", 1)));
+        expectedOutput2.add(new StreamRecord<>(new 
SortPartitionOperatorTest.TestPojo("3", 3)));
+        TestHarnessUtil.assertOutputEquals(
+                "The sort partition result is not correct.",
+                expectedOutput2,
+                testHarness2.getOutput());
+        // 3.Test KeyedSortPartitionOperator sorting records by key selector.
+        KeyedSortPartitionOperator<TestPojo, String> operator3 =
+                createSortPartitionOperatorWithKeySelector();
+        OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness3 =
+                new OneInputStreamOperatorTestHarness<>(operator3);
+        StreamConfig streamConfig3 = testHarness3.getStreamConfig();
+        
streamConfig3.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)
 null));
+        streamConfig3.serializeAllConfigs();
+        Queue<Object> expectedOutput3 = new LinkedList<>();
+        testHarness3.open();
+        testHarness3.processElement(new StreamRecord<>(new TestPojo("3", 3)));
+        testHarness3.processElement(new StreamRecord<>(new TestPojo("1", 1)));
+        testHarness3.endInput();
+        testHarness3.close();
+        expectedOutput3.add(new StreamRecord<>(new 
SortPartitionOperatorTest.TestPojo("1", 1)));
+        expectedOutput3.add(new StreamRecord<>(new 
SortPartitionOperatorTest.TestPojo("3", 3)));
+        TestHarnessUtil.assertOutputEquals(
+                "The sort partition result is not correct.",
+                expectedOutput3,
+                testHarness3.getOutput());
+    }
+
+    @Test
+    void testOpenClose() throws Exception {
+        KeyedSortPartitionOperator<Tuple2<Integer, String>, String> 
sortPartitionOperator =
+                createSortPartitionOperatorWithPositionField();
+        OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Tuple2<Integer, String>>
+                testHarness = new 
OneInputStreamOperatorTestHarness<>(sortPartitionOperator);
+        StreamConfig streamConfig = testHarness.getStreamConfig();
+        
streamConfig.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)
 null));
+        streamConfig.serializeAllConfigs();
+        testHarness.open();
+        testHarness.processElement(new StreamRecord<>(Tuple2.of(1, "1")));
+        testHarness.endInput();
+        testHarness.close();
+        assertThat(testHarness.getOutput()).isNotEmpty();
+    }
+
+    private KeyedSortPartitionOperator<Tuple2<Integer, String>, String>
+            createSortPartitionOperatorWithPositionField() {
+        TypeInformation<Tuple2<Integer, String>> inputType =
+                Types.TUPLE(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+        int positionSortField = 0;
+        Order sortOrder = Order.ASCENDING;
+        return new KeyedSortPartitionOperator<>(inputType, positionSortField, 
sortOrder);
+    }
+
+    private KeyedSortPartitionOperator<TestPojo, String>
+            createSortPartitionOperatorWithStringField() {
+        TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class);
+        String positionSortField = "value";
+        Order sortOrder = Order.ASCENDING;
+        return new KeyedSortPartitionOperator<>(inputType, positionSortField, 
sortOrder);
+    }
+
+    private KeyedSortPartitionOperator<TestPojo, String>
+            createSortPartitionOperatorWithKeySelector() {
+        TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class);
+        Order sortOrder = Order.ASCENDING;
+        return new KeyedSortPartitionOperator<>(inputType, TestPojo::getValue, 
sortOrder);
+    }
+
+    /** The test pojo. */
+    public static class TestPojo implements Serializable {
+
+        public String key;
+
+        public Integer value;
+
+        public TestPojo() {}
+
+        public TestPojo(String key, Integer value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public Integer getValue() {
+            return value;
+        }
+
+        public void setValue(Integer value) {
+            this.value = value;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public void setKey(String key) {
+            this.key = key;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (object instanceof SortPartitionOperatorTest.TestPojo) {
+                SortPartitionOperatorTest.TestPojo testPojo =
+                        (SortPartitionOperatorTest.TestPojo) object;
+                return testPojo.getKey().equals(getKey()) && 
testPojo.getValue().equals(getValue());
+            }
+            return false;
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SerializerComparatorTestData.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SerializerComparatorTestData.java
new file mode 100644
index 00000000000..36dd1d45949
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SerializerComparatorTestData.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** Test data for serializers and comparators in {@link 
KeyedSortPartitionOperator}. */
+class SerializerComparatorTestData {
+
+    @SuppressWarnings("unchecked")
+    static Tuple2<byte[], Integer>[] getOrderedIntTestData() {
+        IntSerializer intSerializer = new IntSerializer();
+        DataOutputSerializer outputSerializer = new 
DataOutputSerializer(intSerializer.getLength());
+
+        return IntStream.range(-10, 10)
+                .mapToObj(
+                        idx -> {
+                            try {
+                                intSerializer.serialize(idx, outputSerializer);
+                                byte[] copyOfBuffer = 
outputSerializer.getCopyOfBuffer();
+                                outputSerializer.clear();
+                                return Tuple2.of(copyOfBuffer, idx);
+                            } catch (IOException e) {
+                                throw new AssertionError(e);
+                            }
+                        })
+                .toArray(Tuple2[]::new);
+    }
+
+    @SuppressWarnings("unchecked")
+    static Tuple2<byte[], String>[] getOrderedStringTestData() {
+        StringSerializer stringSerializer = new StringSerializer();
+        DataOutputSerializer outputSerializer = new DataOutputSerializer(64);
+        return Stream.of(
+                        new String(new byte[] {-1, 0}),
+                        new String(new byte[] {0, 1}),
+                        "A",
+                        "AB",
+                        "ABC",
+                        "ABCD",
+                        "ABCDE",
+                        "ABCDEF",
+                        "ABCDEFG",
+                        "ABCDEFGH")
+                .map(
+                        str -> {
+                            try {
+                                stringSerializer.serialize(str, 
outputSerializer);
+                                byte[] copyOfBuffer = 
outputSerializer.getCopyOfBuffer();
+                                outputSerializer.clear();
+                                return Tuple2.of(copyOfBuffer, str);
+                            } catch (IOException e) {
+                                throw new AssertionError(e);
+                            }
+                        })
+                .sorted(
+                        (o1, o2) -> {
+                            byte[] key0 = o1.f0;
+                            byte[] key1 = o2.f0;
+
+                            int firstLength = key0.length;
+                            int secondLength = key1.length;
+                            int minLength = Math.min(firstLength, 
secondLength);
+                            for (int i = 0; i < minLength; i++) {
+                                int cmp = Byte.compare(key0[i], key1[i]);
+                                if (cmp != 0) {
+                                    return cmp;
+                                }
+                            }
+                            int lengthCmp = Integer.compare(firstLength, 
secondLength);
+                            if (lengthCmp != 0) {
+                                return lengthCmp;
+                            }
+                            return o1.f1.compareTo(o2.f1);
+                        })
+                .toArray(Tuple2[]::new);
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java
new file mode 100644
index 00000000000..e0d35273872
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link SortPartitionOperator}. */
+class SortPartitionOperatorTest {
+
+    @Test
+    void testSortPartition() throws Exception {
+        // 1.Test SortPartitionOperator sorting records by position field.
+        SortPartitionOperator<Tuple2<Integer, String>> operator1 =
+                createSortPartitionOperatorWithPositionField();
+        OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Tuple2<Integer, String>>
+                testHarness1 = new 
OneInputStreamOperatorTestHarness<>(operator1);
+        Queue<Object> expectedOutput1 = new LinkedList<>();
+        testHarness1.setup();
+        testHarness1.processElement(new StreamRecord<>(Tuple2.of(3, "3")));
+        testHarness1.processElement(new StreamRecord<>(Tuple2.of(1, "1")));
+        testHarness1.endInput();
+        testHarness1.close();
+        expectedOutput1.add(new StreamRecord<>(Tuple2.of(1, "1")));
+        expectedOutput1.add(new StreamRecord<>(Tuple2.of(3, "3")));
+        TestHarnessUtil.assertOutputEquals(
+                "The sort partition result is not correct.",
+                expectedOutput1,
+                testHarness1.getOutput());
+        // 2.Test SortPartitionOperator sorting records by string field.
+        SortPartitionOperator<TestPojo> operator2 = 
createSortPartitionOperatorWithStringField();
+        OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness2 =
+                new OneInputStreamOperatorTestHarness<>(operator2);
+        Queue<Object> expectedOutput2 = new LinkedList<>();
+        testHarness2.setup();
+        testHarness2.processElement(new StreamRecord<>(new TestPojo("3", 3)));
+        testHarness2.processElement(new StreamRecord<>(new TestPojo("1", 1)));
+        testHarness2.endInput();
+        testHarness2.close();
+        expectedOutput2.add(new StreamRecord<>(new TestPojo("1", 1)));
+        expectedOutput2.add(new StreamRecord<>(new TestPojo("3", 3)));
+        TestHarnessUtil.assertOutputEquals(
+                "The sort partition result is not correct.",
+                expectedOutput2,
+                testHarness2.getOutput());
+        // 3.Test SortPartitionOperator sorting records by key selector.
+        SortPartitionOperator<TestPojo> operator3 = 
createSortPartitionOperatorWithKeySelector();
+        OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness3 =
+                new OneInputStreamOperatorTestHarness<>(operator3);
+        Queue<Object> expectedOutput3 = new LinkedList<>();
+        testHarness3.setup();
+        testHarness3.processElement(new StreamRecord<>(new TestPojo("3", 3)));
+        testHarness3.processElement(new StreamRecord<>(new TestPojo("1", 1)));
+        testHarness3.endInput();
+        testHarness3.close();
+        expectedOutput3.add(new StreamRecord<>(new TestPojo("1", 1)));
+        expectedOutput3.add(new StreamRecord<>(new TestPojo("3", 3)));
+        TestHarnessUtil.assertOutputEquals(
+                "The sort partition result is not correct.",
+                expectedOutput3,
+                testHarness3.getOutput());
+    }
+
+    @Test
+    void testOpenClose() throws Exception {
+        SortPartitionOperator<Tuple2<Integer, String>> sortPartitionOperator =
+                createSortPartitionOperatorWithPositionField();
+        OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Tuple2<Integer, String>>
+                testHarness = new 
OneInputStreamOperatorTestHarness<>(sortPartitionOperator);
+        testHarness.open();
+        testHarness.processElement(new StreamRecord<>(Tuple2.of(1, "1")));
+        testHarness.endInput();
+        testHarness.close();
+        assertThat(testHarness.getOutput()).isNotEmpty();
+    }
+
+    private SortPartitionOperator<Tuple2<Integer, String>>
+            createSortPartitionOperatorWithPositionField() {
+        TypeInformation<Tuple2<Integer, String>> inputType =
+                Types.TUPLE(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+        int positionSortField = 0;
+        Order sortOrder = Order.ASCENDING;
+        return new SortPartitionOperator<>(inputType, positionSortField, 
sortOrder);
+    }
+
+    private SortPartitionOperator<TestPojo> 
createSortPartitionOperatorWithStringField() {
+        TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class);
+        String positionSortField = "value";
+        Order sortOrder = Order.ASCENDING;
+        return new SortPartitionOperator<>(inputType, positionSortField, 
sortOrder);
+    }
+
+    private SortPartitionOperator<TestPojo> 
createSortPartitionOperatorWithKeySelector() {
+        TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class);
+        Order sortOrder = Order.ASCENDING;
+        return new SortPartitionOperator<>(inputType, TestPojo::getValue, 
sortOrder);
+    }
+
+    /** The test pojo. */
+    public static class TestPojo implements Serializable {
+
+        public String key;
+
+        public Integer value;
+
+        public TestPojo() {}
+
+        public TestPojo(String key, Integer value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public Integer getValue() {
+            return value;
+        }
+
+        public void setValue(Integer value) {
+            this.value = value;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public void setKey(String key) {
+            this.key = key;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (object instanceof TestPojo) {
+                TestPojo testPojo = (TestPojo) object;
+                return testPojo.getKey().equals(getKey()) && 
testPojo.getValue().equals(getValue());
+            }
+            return false;
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparatorTest.java
new file mode 100644
index 00000000000..ecd82815b53
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparatorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sortpartition;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link VariableLengthByteKeyAndValueComparator} in {@link 
KeyedSortPartitionOperator}.
+ */
+class VariableLengthByteKeyAndValueComparatorTest
+        extends ComparatorTestBase<Tuple2<byte[], String>> {
+    @Override
+    protected Order[] getTestedOrder() {
+        return new Order[] {Order.ASCENDING};
+    }
+
+    @Override
+    protected TypeComparator<Tuple2<byte[], String>> createComparator(boolean 
ascending) {
+        return new VariableLengthByteKeyAndValueComparator<>(
+                BasicTypeInfo.STRING_TYPE_INFO.createComparator(ascending, 
null));
+    }
+
+    @Override
+    protected TypeSerializer<Tuple2<byte[], String>> createSerializer() {
+        StringSerializer stringSerializer = new StringSerializer();
+        return new KeyAndValueSerializer<>(stringSerializer, 
stringSerializer.getLength());
+    }
+
+    @Override
+    protected void deepEquals(
+            String message, Tuple2<byte[], String> should, Tuple2<byte[], 
String> is) {
+        assertThat(is.f0).as(message).isEqualTo(should.f0);
+        assertThat(is.f1).as(message).isEqualTo(should.f1);
+    }
+
+    @Override
+    protected Tuple2<byte[], String>[] getSortedTestData() {
+        return SerializerComparatorTestData.getOrderedStringTestData();
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index f71b60a08a8..2da74701adb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -261,7 +261,10 @@ public class TypeSerializerTestCoverageTest extends 
TestLogger {
                         DecimalDataSerializer.class.getName(),
                         
SharedBufferNode.SharedBufferNodeSerializer.class.getName(),
                         NFA.NFASerializer.class.getName(),
-                        AvroSerializer.class.getName());
+                        AvroSerializer.class.getName(),
+                        // KeyAndValueSerializer shouldn't be used to 
serialize data to state and
+                        // doesn't need to ensure upgrade compatibility.
+                        
"org.apache.flink.streaming.api.operators.sortpartition.KeyAndValueSerializer");
 
         // check if a test exists for each type serializer
         for (Class<? extends TypeSerializer> typeSerializer : typeSerializers) 
{
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
index c51de02d9fb..ac2425594a7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -210,6 +211,240 @@ class KeyedPartitionWindowedStreamITCase {
         expectInAnyOrder(resultIterator, "97", "97", "97");
     }
 
+    @Test
+    void testSortPartitionOfTupleElementsAscending() throws Exception {
+        expectInAnyOrder(
+                sortPartitionOfTupleElementsInOrder(Order.ASCENDING),
+                "0 1 3 7 ",
+                "0 1 79 100 ",
+                "8 55 66 77 ");
+    }
+
+    @Test
+    void testSortPartitionOfTupleElementsDescending() throws Exception {
+        expectInAnyOrder(
+                sortPartitionOfTupleElementsInOrder(Order.DESCENDING),
+                "7 3 1 0 ",
+                "100 79 1 0 ",
+                "77 66 55 8 ");
+    }
+
+    @Test
+    void testSortPartitionOfPojoElementsAscending() throws Exception {
+        expectInAnyOrder(
+                sortPartitionOfPojoElementsInOrder(Order.ASCENDING),
+                "0 1 3 7 ",
+                "0 1 79 100 ",
+                "8 55 66 77 ");
+    }
+
+    @Test
+    void testSortPartitionOfPojoElementsDescending() throws Exception {
+        expectInAnyOrder(
+                sortPartitionOfPojoElementsInOrder(Order.DESCENDING),
+                "7 3 1 0 ",
+                "100 79 1 0 ",
+                "77 66 55 8 ");
+    }
+
+    @Test
+    public void testSortPartitionByKeySelectorAscending() throws Exception {
+        expectInAnyOrder(
+                sortPartitionByKeySelectorInOrder(Order.ASCENDING),
+                "0 1 3 7 ",
+                "0 1 79 100 ",
+                "8 55 66 77 ");
+    }
+
+    @Test
+    void testSortPartitionByKeySelectorDescending() throws Exception {
+        expectInAnyOrder(
+                sortPartitionByKeySelectorInOrder(Order.DESCENDING),
+                "7 3 1 0 ",
+                "100 79 1 0 ",
+                "77 66 55 8 ");
+    }
+
+    private CloseableIterator<String> 
sortPartitionOfTupleElementsInOrder(Order order)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Tuple2<String, Integer>> source =
+                env.fromData(
+                        Tuple2.of("key1", 0),
+                        Tuple2.of("key1", 7),
+                        Tuple2.of("key1", 3),
+                        Tuple2.of("key1", 1),
+                        Tuple2.of("key2", 1),
+                        Tuple2.of("key2", 100),
+                        Tuple2.of("key2", 0),
+                        Tuple2.of("key2", 79),
+                        Tuple2.of("key3", 77),
+                        Tuple2.of("key3", 66),
+                        Tuple2.of("key3", 55),
+                        Tuple2.of("key3", 8));
+        return source.map(
+                        new MapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
+                            @Override
+                            public Tuple2<String, Integer> map(Tuple2<String, 
Integer> value)
+                                    throws Exception {
+                                return value;
+                            }
+                        })
+                .setParallelism(2)
+                .keyBy(
+                        new KeySelector<Tuple2<String, Integer>, String>() {
+                            @Override
+                            public String getKey(Tuple2<String, Integer> 
value) throws Exception {
+                                return value.f0;
+                            }
+                        })
+                .fullWindowPartition()
+                .sortPartition(1, order)
+                .fullWindowPartition()
+                .mapPartition(
+                        new MapPartitionFunction<Tuple2<String, Integer>, 
String>() {
+                            @Override
+                            public void mapPartition(
+                                    Iterable<Tuple2<String, Integer>> values,
+                                    Collector<String> out) {
+                                StringBuilder sb = new StringBuilder();
+                                String preKey = null;
+                                for (Tuple2<String, Integer> value : values) {
+                                    if (preKey != null && 
!preKey.equals(value.f0)) {
+                                        out.collect(sb.toString());
+                                        sb = new StringBuilder();
+                                    }
+                                    sb.append(value.f1);
+                                    sb.append(" ");
+                                    preKey = value.f0;
+                                }
+                                out.collect(sb.toString());
+                            }
+                        })
+                .executeAndCollect();
+    }
+
+    private CloseableIterator<String> sortPartitionOfPojoElementsInOrder(Order 
order)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<TestPojo> source =
+                env.fromData(
+                        new TestPojo("key1", 0),
+                        new TestPojo("key1", 7),
+                        new TestPojo("key1", 3),
+                        new TestPojo("key1", 1),
+                        new TestPojo("key2", 1),
+                        new TestPojo("key2", 100),
+                        new TestPojo("key2", 0),
+                        new TestPojo("key2", 79),
+                        new TestPojo("key3", 77),
+                        new TestPojo("key3", 66),
+                        new TestPojo("key3", 55),
+                        new TestPojo("key3", 8));
+        return source.map(
+                        new MapFunction<TestPojo, TestPojo>() {
+                            @Override
+                            public TestPojo map(TestPojo value) throws 
Exception {
+                                return value;
+                            }
+                        })
+                .setParallelism(2)
+                .keyBy(
+                        new KeySelector<TestPojo, String>() {
+                            @Override
+                            public String getKey(TestPojo value) throws 
Exception {
+                                return value.getKey();
+                            }
+                        })
+                .fullWindowPartition()
+                .sortPartition("value", order)
+                .fullWindowPartition()
+                .mapPartition(
+                        new MapPartitionFunction<TestPojo, String>() {
+                            @Override
+                            public void mapPartition(
+                                    Iterable<TestPojo> values, 
Collector<String> out) {
+                                StringBuilder sb = new StringBuilder();
+                                String preKey = null;
+                                for (TestPojo value : values) {
+                                    if (preKey != null && 
!preKey.equals(value.getKey())) {
+                                        out.collect(sb.toString());
+                                        sb = new StringBuilder();
+                                    }
+                                    sb.append(value.getValue());
+                                    sb.append(" ");
+                                    preKey = value.getKey();
+                                }
+                                out.collect(sb.toString());
+                            }
+                        })
+                .executeAndCollect();
+    }
+
+    private CloseableIterator<String> sortPartitionByKeySelectorInOrder(Order 
order)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<TestPojo> source =
+                env.fromData(
+                        new TestPojo("key1", 0),
+                        new TestPojo("key1", 7),
+                        new TestPojo("key1", 3),
+                        new TestPojo("key1", 1),
+                        new TestPojo("key2", 1),
+                        new TestPojo("key2", 100),
+                        new TestPojo("key2", 0),
+                        new TestPojo("key2", 79),
+                        new TestPojo("key3", 77),
+                        new TestPojo("key3", 66),
+                        new TestPojo("key3", 55),
+                        new TestPojo("key3", 8));
+        return source.map(
+                        new MapFunction<TestPojo, TestPojo>() {
+                            @Override
+                            public TestPojo map(TestPojo value) throws 
Exception {
+                                return value;
+                            }
+                        })
+                .setParallelism(2)
+                .keyBy(
+                        new KeySelector<TestPojo, String>() {
+                            @Override
+                            public String getKey(TestPojo value) throws 
Exception {
+                                return value.getKey();
+                            }
+                        })
+                .fullWindowPartition()
+                .sortPartition(
+                        new KeySelector<TestPojo, Integer>() {
+                            @Override
+                            public Integer getKey(TestPojo value) throws 
Exception {
+                                return value.getValue();
+                            }
+                        },
+                        order)
+                .fullWindowPartition()
+                .mapPartition(
+                        new MapPartitionFunction<TestPojo, String>() {
+                            @Override
+                            public void mapPartition(
+                                    Iterable<TestPojo> values, 
Collector<String> out) {
+                                StringBuilder sb = new StringBuilder();
+                                String preKey = null;
+                                for (TestPojo value : values) {
+                                    if (preKey != null && 
!preKey.equals(value.getKey())) {
+                                        out.collect(sb.toString());
+                                        sb = new StringBuilder();
+                                    }
+                                    sb.append(value.getValue());
+                                    sb.append(" ");
+                                    preKey = value.getKey();
+                                }
+                                out.collect(sb.toString());
+                            }
+                        })
+                .executeAndCollect();
+    }
+
     private Collection<Tuple2<String, String>> createSource() {
         List<Tuple2<String, String>> source = new ArrayList<>();
         for (int index = 0; index < EVENT_NUMBER; ++index) {
@@ -248,4 +483,39 @@ class KeyedPartitionWindowedStreamITCase {
             return String.valueOf(testField);
         }
     }
+
+    /** The test pojo. */
+    public static class TestPojo {
+
+        public String key;
+
+        public Integer value;
+
+        public TestPojo() {}
+
+        public TestPojo(Integer value) {
+            this.value = value;
+        }
+
+        public TestPojo(String key, Integer value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public Integer getValue() {
+            return value;
+        }
+
+        public void setValue(Integer value) {
+            this.value = value;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public void setKey(String key) {
+            this.key = key;
+        }
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
index b522664c9d3..6ec18d01bfe 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java
@@ -22,6 +22,9 @@ import 
org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import 
org.apache.flink.streaming.api.datastream.NonKeyedPartitionWindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -136,6 +139,159 @@ class NonKeyedPartitionWindowedStreamITCase {
         expectInAnyOrder(resultIterator, "94", "94");
     }
 
+    @Test
+    void testSortPartitionOfTupleElementsAscending() throws Exception {
+        expectInAnyOrder(sortPartitionOfTupleElementsInOrder(Order.ASCENDING), 
"013", "013");
+    }
+
+    @Test
+    void testSortPartitionOfTupleElementsDescending() throws Exception {
+        
expectInAnyOrder(sortPartitionOfTupleElementsInOrder(Order.DESCENDING), "310", 
"310");
+    }
+
+    @Test
+    void testSortPartitionOfPojoElementsAscending() throws Exception {
+        expectInAnyOrder(sortPartitionOfPojoElementsInOrder(Order.ASCENDING), 
"013", "013");
+    }
+
+    @Test
+    void testSortPartitionOfPojoElementsDescending() throws Exception {
+        expectInAnyOrder(sortPartitionOfPojoElementsInOrder(Order.DESCENDING), 
"310", "310");
+    }
+
+    @Test
+    void testSortPartitionByKeySelectorAscending() throws Exception {
+        expectInAnyOrder(sortPartitionByKeySelectorInOrder(Order.ASCENDING), 
"013", "013");
+    }
+
+    @Test
+    void testSortPartitionByKeySelectorDescending() throws Exception {
+        expectInAnyOrder(sortPartitionByKeySelectorInOrder(Order.DESCENDING), 
"310", "310");
+    }
+
+    private CloseableIterator<String> 
sortPartitionOfTupleElementsInOrder(Order order)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Tuple2<String, Integer>> source =
+                env.fromData(
+                        Tuple2.of("Test", 0),
+                        Tuple2.of("Test", 0),
+                        Tuple2.of("Test", 3),
+                        Tuple2.of("Test", 3),
+                        Tuple2.of("Test", 1),
+                        Tuple2.of("Test", 1));
+        return source.rebalance()
+                .map(
+                        new MapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
+                            @Override
+                            public Tuple2<String, Integer> map(Tuple2<String, 
Integer> value)
+                                    throws Exception {
+                                return value;
+                            }
+                        })
+                .setParallelism(2)
+                .fullWindowPartition()
+                .sortPartition(1, order)
+                .fullWindowPartition()
+                .mapPartition(
+                        new MapPartitionFunction<Tuple2<String, Integer>, 
String>() {
+                            @Override
+                            public void mapPartition(
+                                    Iterable<Tuple2<String, Integer>> values,
+                                    Collector<String> out) {
+                                StringBuilder sb = new StringBuilder();
+                                for (Tuple2<String, Integer> value : values) {
+                                    sb.append(value.f1);
+                                }
+                                out.collect(sb.toString());
+                            }
+                        })
+                .executeAndCollect();
+    }
+
+    private CloseableIterator<String> sortPartitionOfPojoElementsInOrder(Order 
order)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<TestPojo> source =
+                env.fromData(
+                        new TestPojo(0),
+                        new TestPojo(0),
+                        new TestPojo(3),
+                        new TestPojo(3),
+                        new TestPojo(1),
+                        new TestPojo(1));
+        return source.rebalance()
+                .map(
+                        new MapFunction<TestPojo, TestPojo>() {
+                            @Override
+                            public TestPojo map(TestPojo value) throws 
Exception {
+                                return value;
+                            }
+                        })
+                .setParallelism(2)
+                .fullWindowPartition()
+                .sortPartition("value", order)
+                .fullWindowPartition()
+                .mapPartition(
+                        new MapPartitionFunction<TestPojo, String>() {
+                            @Override
+                            public void mapPartition(
+                                    Iterable<TestPojo> values, 
Collector<String> out) {
+                                StringBuilder sb = new StringBuilder();
+                                for (TestPojo value : values) {
+                                    sb.append(value.getValue());
+                                }
+                                out.collect(sb.toString());
+                            }
+                        })
+                .executeAndCollect();
+    }
+
+    private CloseableIterator<String> sortPartitionByKeySelectorInOrder(Order 
order)
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<TestPojo> source =
+                env.fromData(
+                        new TestPojo("KEY", 0),
+                        new TestPojo("KEY", 0),
+                        new TestPojo("KEY", 3),
+                        new TestPojo("KEY", 3),
+                        new TestPojo("KEY", 1),
+                        new TestPojo("KEY", 1));
+        return source.rebalance()
+                .map(
+                        new MapFunction<TestPojo, TestPojo>() {
+                            @Override
+                            public TestPojo map(TestPojo value) throws 
Exception {
+                                return value;
+                            }
+                        })
+                .setParallelism(2)
+                .fullWindowPartition()
+                .sortPartition(
+                        new KeySelector<TestPojo, Integer>() {
+                            @Override
+                            public Integer getKey(TestPojo value) throws 
Exception {
+                                return value.getValue();
+                            }
+                        },
+                        order)
+                .fullWindowPartition()
+                .mapPartition(
+                        new MapPartitionFunction<TestPojo, String>() {
+                            @Override
+                            public void mapPartition(
+                                    Iterable<TestPojo> values, 
Collector<String> out) {
+                                StringBuilder sb = new StringBuilder();
+                                for (TestPojo value : values) {
+                                    sb.append(value.getValue());
+                                }
+                                out.collect(sb.toString());
+                            }
+                        })
+                .executeAndCollect();
+    }
+
     private void expectInAnyOrder(CloseableIterator<String> resultIterator, 
String... expected) {
         List<String> listExpected = Lists.newArrayList(expected);
         List<String> testResults = Lists.newArrayList(resultIterator);
@@ -172,4 +328,39 @@ class NonKeyedPartitionWindowedStreamITCase {
             return String.valueOf(testField);
         }
     }
+
+    /** The test pojo. */
+    public static class TestPojo {
+
+        public String key;
+
+        public Integer value;
+
+        public TestPojo() {}
+
+        public TestPojo(Integer value) {
+            this.value = value;
+        }
+
+        public TestPojo(String key, Integer value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public Integer getValue() {
+            return value;
+        }
+
+        public void setValue(Integer value) {
+            this.value = value;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public void setKey(String key) {
+            this.key = key;
+        }
+    }
 }

Reply via email to