This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a021db68a8545b4183e935beccd9b0d62329ee67 Author: Wencong Liu <[email protected]> AuthorDate: Tue Mar 12 14:46:04 2024 +0800 [FLINK-34543][datastream] Introduce the SortPartition API on PartitionWindowedStream --- .../generated/execution_configuration.html | 12 + .../flink/configuration/ExecutionOptions.java | 18 ++ .../datastream/KeyedPartitionWindowedStream.java | 82 +++++ .../NonKeyedPartitionWindowedStream.java | 76 +++++ .../api/datastream/PartitionWindowedStream.java | 63 ++++ .../FixedLengthByteKeyAndValueComparator.java | 180 +++++++++++ .../sortpartition/KeyAndValueSerializer.java | 175 ++++++++++ .../sortpartition/KeyedSortPartitionOperator.java | 355 +++++++++++++++++++++ .../sortpartition/SortPartitionOperator.java | 286 +++++++++++++++++ .../VariableLengthByteKeyAndValueComparator.java | 189 +++++++++++ .../OneInputTransformationTranslator.java | 12 + .../FixedLengthByteKeyAndValueComparatorTest.java | 63 ++++ .../sortpartition/KeyAndValueSerializerTest.java | 71 +++++ .../KeyedSortPartitionOperatorTest.java | 187 +++++++++++ .../SerializerComparatorTestData.java | 101 ++++++ .../sortpartition/SortPartitionOperatorTest.java | 168 ++++++++++ ...ariableLengthByteKeyAndValueComparatorTest.java | 64 ++++ .../TypeSerializerTestCoverageTest.java | 5 +- .../KeyedPartitionWindowedStreamITCase.java | 270 ++++++++++++++++ .../NonKeyedPartitionWindowedStreamITCase.java | 191 +++++++++++ 20 files changed, 2567 insertions(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/execution_configuration.html b/docs/layouts/shortcodes/generated/execution_configuration.html index 5407666376d..6aa337d623a 100644 --- a/docs/layouts/shortcodes/generated/execution_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_configuration.html @@ -38,5 +38,17 @@ <td><p>Enum</p></td> <td>Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, network shuffle behavior, and time semantics.<br /><br />Possible values:<ul><li>"STREAMING"</li><li>"BATCH"</li><li>"AUTOMATIC"</li></ul></td> </tr> + <tr> + <td><h5>execution.sort-keyed-partition.memory</h5></td> + <td style="word-wrap: break-word;">128 mb</td> + <td>MemorySize</td> + <td>Sets the managed memory size for sort partition operator on KeyedPartitionWindowedStream.The memory size is only a weight hint. Thus, it will affect the operator's memory weight within a task, but the actual memory used depends on the running environment.</td> + </tr> + <tr> + <td><h5>execution.sort-partition.memory</h5></td> + <td style="word-wrap: break-word;">128 mb</td> + <td>MemorySize</td> + <td>Sets the managed memory size for sort partition operator in NonKeyedPartitionWindowedStream.The memory size is only a weight hint. Thus, it will affect the operator's memory weight within a task, but the actual memory used depends on the running environment.</td> + </tr> </tbody> </table> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java index 2a5f0b61736..2c32e2ad55b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java @@ -128,6 +128,24 @@ public class ExecutionOptions { + "throughput")) .build()); + public static final ConfigOption<MemorySize> SORT_PARTITION_MEMORY = + ConfigOptions.key("execution.sort-partition.memory") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(128)) + .withDescription( + "Sets the managed memory size for sort partition operator in NonKeyedPartitionWindowedStream." + + "The memory size is only a weight hint. Thus, it will affect the operator's memory weight within a " + + "task, but the actual memory used depends on the running environment."); + + public static final ConfigOption<MemorySize> SORT_KEYED_PARTITION_MEMORY = + ConfigOptions.key("execution.sort-keyed-partition.memory") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(128)) + .withDescription( + "Sets the managed memory size for sort partition operator on KeyedPartitionWindowedStream." + + "The memory size is only a weight hint. Thus, it will affect the operator's memory weight within a " + + "task, but the actual memory used depends on the running environment."); + @Documentation.ExcludeFromDocumentation( "This is an expert option, that we do not want to expose in the documentation") public static final ConfigOption<Boolean> SORT_INPUTS = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java index 5ccc05826bf..6734fbec0db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java @@ -22,14 +22,20 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.operators.sortpartition.KeyedSortPartitionOperator; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -90,4 +96,80 @@ public class KeyedPartitionWindowedStream<T, KEY> implements PartitionWindowedSt return input.window(GlobalWindows.createWithEndOfStreamTrigger()) .aggregate(aggregateFunction); } + + @Override + public SingleOutputStreamOperator<T> sortPartition(int field, Order order) { + checkNotNull(order, "The order must not be null."); + checkArgument(field > 0, "The field mustn't be less than zero."); + TypeInformation<T> inputType = input.getType(); + KeyedSortPartitionOperator<T, KEY> operator = + new KeyedSortPartitionOperator<>(inputType, field, order); + final String opName = "KeyedSortPartition"; + SingleOutputStreamOperator<T> result = + this.input + .transform(opName, inputType, operator) + .setParallelism(input.getParallelism()); + int managedMemoryWeight = + Math.max( + 1, + environment + .getConfiguration() + .get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY) + .getMebiBytes()); + result.getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, managedMemoryWeight); + return result; + } + + @Override + public SingleOutputStreamOperator<T> sortPartition(String field, Order order) { + checkNotNull(field, "The field must not be null."); + checkNotNull(order, "The order must not be null."); + TypeInformation<T> inputType = input.getType(); + KeyedSortPartitionOperator<T, KEY> operator = + new KeyedSortPartitionOperator<>(inputType, field, order); + final String opName = "KeyedSortPartition"; + SingleOutputStreamOperator<T> result = + this.input + .transform(opName, inputType, operator) + .setParallelism(input.getParallelism()); + int managedMemoryWeight = + Math.max( + 1, + environment + .getConfiguration() + .get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY) + .getMebiBytes()); + result.getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, managedMemoryWeight); + return result; + } + + @Override + public <K> SingleOutputStreamOperator<T> sortPartition( + KeySelector<T, K> keySelector, Order order) { + checkNotNull(keySelector, "The field must not be null."); + checkNotNull(order, "The order must not be null."); + TypeInformation<T> inputType = input.getType(); + KeyedSortPartitionOperator<T, KEY> operator = + new KeyedSortPartitionOperator<>(inputType, environment.clean(keySelector), order); + final String opName = "KeyedSortPartition"; + SingleOutputStreamOperator<T> result = + this.input + .transform(opName, inputType, operator) + .setParallelism(input.getParallelism()); + int managedMemoryWeight = + Math.max( + 1, + environment + .getConfiguration() + .get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY) + .getMebiBytes()); + result.getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, managedMemoryWeight); + return result; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java index 0f415aed876..6efeacc7ebc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java @@ -22,13 +22,19 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.MapPartitionOperator; import org.apache.flink.streaming.api.operators.PartitionAggregateOperator; import org.apache.flink.streaming.api.operators.PartitionReduceOperator; +import org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperator; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -88,4 +94,74 @@ public class NonKeyedPartitionWindowedStream<T> implements PartitionWindowedStre opName, resultType, new PartitionAggregateOperator<>(aggregateFunction)) .setParallelism(input.getParallelism()); } + + @Override + public SingleOutputStreamOperator<T> sortPartition(int field, Order order) { + checkNotNull(order, "The order must not be null."); + checkArgument(field > 0, "The field mustn't be less than zero."); + SortPartitionOperator<T> operator = + new SortPartitionOperator<>(input.getType(), field, order); + final String opName = "SortPartition"; + SingleOutputStreamOperator<T> result = + input.transform(opName, input.getType(), operator) + .setParallelism(input.getParallelism()); + int managedMemoryWeight = + Math.max( + 1, + environment + .getConfiguration() + .get(ExecutionOptions.SORT_PARTITION_MEMORY) + .getMebiBytes()); + result.getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, managedMemoryWeight); + return result; + } + + @Override + public SingleOutputStreamOperator<T> sortPartition(String field, Order order) { + checkNotNull(field, "The field must not be null."); + checkNotNull(order, "The order must not be null."); + SortPartitionOperator<T> operator = + new SortPartitionOperator<>(input.getType(), field, order); + final String opName = "SortPartition"; + SingleOutputStreamOperator<T> result = + input.transform(opName, input.getType(), operator) + .setParallelism(input.getParallelism()); + int managedMemoryWeight = + Math.max( + 1, + environment + .getConfiguration() + .get(ExecutionOptions.SORT_PARTITION_MEMORY) + .getMebiBytes()); + result.getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, managedMemoryWeight); + return result; + } + + @Override + public <K> SingleOutputStreamOperator<T> sortPartition( + KeySelector<T, K> keySelector, Order order) { + checkNotNull(keySelector, "The field must not be null."); + checkNotNull(order, "The order must not be null."); + SortPartitionOperator<T> operator = + new SortPartitionOperator<>(input.getType(), environment.clean(keySelector), order); + final String opName = "SortPartition"; + SingleOutputStreamOperator<T> result = + input.transform(opName, input.getType(), operator) + .setParallelism(input.getParallelism()); + int managedMemoryWeight = + Math.max( + 1, + environment + .getConfiguration() + .get(ExecutionOptions.SORT_PARTITION_MEMORY) + .getMebiBytes()); + result.getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, managedMemoryWeight); + return result; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java index 19ebb0a7530..7741f3c27b3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java @@ -22,6 +22,11 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.configuration.ExecutionOptions; /** * {@link PartitionWindowedStream} represents a data stream that collects all records of each @@ -61,4 +66,62 @@ public interface PartitionWindowedStream<T> { */ <ACC, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, R> aggregateFunction); + + /** + * Sorts the records of the window on the specified field in the specified order. The type of + * records must be {@link Tuple}. + * + * <p>This operator will use managed memory for the sort.For {@link + * NonKeyedPartitionWindowedStream}, the managed memory size can be set by + * "execution.sort.partition.memory" in {@link ExecutionOptions}. For {@link + * KeyedPartitionWindowedStream}, the managed memory size can be set by + * "execution.sort.keyed.partition.memory" in {@link ExecutionOptions}. + * + * @param field The field index on which records is sorted. + * @param order The order in which records is sorted. + * @return The data stream with sorted records. + */ + SingleOutputStreamOperator<T> sortPartition(int field, Order order); + + /** + * Sorts the records of the window on the specified field in the specified order. The type of + * records must be Flink POJO {@link PojoTypeInfo}. A type is considered a Flink POJO type, if + * it fulfills the conditions below. + * + * <ul> + * <li>It is a public class, and standalone (not a non-static inner class). + * <li>It has a public no-argument constructor. + * <li>All non-static, non-transient fields in the class (and all superclasses) are either + * public (and non-final) or have a public getter and a setter method that follows the + * Java beans naming conventions for getters and setters. + * <li>It is a fixed-length, null-aware composite type with non-deterministic field order. + * Every field can be null independent of the field's type. + * </ul> + * + * <p>This operator will use managed memory for the sort.For {@link + * NonKeyedPartitionWindowedStream}, the managed memory size can be set by + * "execution.sort.partition.memory" in {@link ExecutionOptions}. For {@link + * KeyedPartitionWindowedStream}, the managed memory size can be set by + * "execution.sort.keyed.partition.memory" in {@link ExecutionOptions}. + * + * @param field The field expression referring to the field on which records is sorted. + * @param order The order in which records is sorted. + * @return The data stream with sorted records. + */ + SingleOutputStreamOperator<T> sortPartition(String field, Order order); + + /** + * Sorts the records according to a {@link KeySelector} in the specified order. + * + * <p>This operator will use managed memory for the sort.For {@link + * NonKeyedPartitionWindowedStream}, the managed memory size can be set by + * "execution.sort.partition.memory" in {@link ExecutionOptions}. For {@link + * KeyedPartitionWindowedStream}, the managed memory size can be set by + * "execution.sort.keyed.partition.memory" in {@link ExecutionOptions}. + * + * @param keySelector The key selector to extract key from the records for sorting. + * @param order The order in which records is sorted. + * @return The data stream with sorted records. + */ + <K> SingleOutputStreamOperator<T> sortPartition(KeySelector<T, K> keySelector, Order order); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java new file mode 100644 index 00000000000..3768b621c98 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparator.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.IOException; +import java.util.Arrays; + +/** + * The {@link FixedLengthByteKeyAndValueComparator} is used by {@link KeyedSortPartitionOperator} to + * compare records according to both the record key and record value. The length of record key must + * be fixed and will be initialized when the {@link FixedLengthByteKeyAndValueComparator} is + * created. + */ +@Internal +public class FixedLengthByteKeyAndValueComparator<INPUT> + extends TypeComparator<Tuple2<byte[], INPUT>> { + + private final int serializedKeyLength; + + private final TypeComparator<INPUT> valueComparator; + + private byte[] keyReference; + + private INPUT valueReference; + + FixedLengthByteKeyAndValueComparator( + int serializedKeyLength, TypeComparator<INPUT> valueComparator) { + this.serializedKeyLength = serializedKeyLength; + this.valueComparator = valueComparator; + } + + @Override + public int hash(Tuple2<byte[], INPUT> record) { + return record.hashCode(); + } + + @Override + public void setReference(Tuple2<byte[], INPUT> toCompare) { + this.keyReference = toCompare.f0; + this.valueReference = toCompare.f1; + } + + @Override + public boolean equalToReference(Tuple2<byte[], INPUT> candidate) { + return Arrays.equals(keyReference, candidate.f0) && valueReference.equals(candidate.f1); + } + + @Override + public int compareToReference(TypeComparator<Tuple2<byte[], INPUT>> referencedComparator) { + byte[] otherKey = + ((FixedLengthByteKeyAndValueComparator<INPUT>) referencedComparator).keyReference; + INPUT otherValue = + ((FixedLengthByteKeyAndValueComparator<INPUT>) referencedComparator).valueReference; + int keyCmp = compareKey(otherKey, this.keyReference); + if (keyCmp != 0) { + return keyCmp; + } + return valueComparator.compare(this.valueReference, otherValue); + } + + @Override + public int compare(Tuple2<byte[], INPUT> first, Tuple2<byte[], INPUT> second) { + int keyCmp = compareKey(first.f0, second.f0); + if (keyCmp != 0) { + return keyCmp; + } + return valueComparator.compare(first.f1, second.f1); + } + + private int compareKey(byte[] first, byte[] second) { + for (int i = 0; i < serializedKeyLength; i++) { + int cmp = Byte.compare(first[i], second[i]); + if (cmp != 0) { + return cmp < 0 ? -1 : 1; + } + } + return 0; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) + throws IOException { + int minCount = serializedKeyLength; + while (minCount-- > 0) { + byte firstValue = firstSource.readByte(); + byte secondValue = secondSource.readByte(); + int cmp = Byte.compare(firstValue, secondValue); + if (cmp != 0) { + return cmp < 0 ? -1 : 1; + } + } + return valueComparator.compareSerialized(firstSource, secondSource); + } + + @Override + public boolean supportsNormalizedKey() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return false; + } + + @Override + public void putNormalizedKey( + Tuple2<byte[], INPUT> record, MemorySegment target, int offset, int numBytes) { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public TypeComparator<Tuple2<byte[], INPUT>> duplicate() { + return new FixedLengthByteKeyAndValueComparator<>( + this.serializedKeyLength, this.valueComparator); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator<?>[] getFlatComparators() { + return new TypeComparator[] {this}; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(Tuple2<byte[], INPUT> record, DataOutputView target) { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } + + @Override + public Tuple2<byte[], INPUT> readWithKeyDenormalization( + Tuple2<byte[], INPUT> reuse, DataInputView source) { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializer.java new file mode 100644 index 00000000000..2788880b7b3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializer.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +/** + * {@link KeyAndValueSerializer} is used in {@link KeyedSortPartitionOperator} for serializing + * elements including key and record. It serializes the record in a format known by the {@link + * FixedLengthByteKeyAndValueComparator} and {@link VariableLengthByteKeyAndValueComparator}. + * + * <p>If the key's length is fixed and known, the format of each serialized element is as follows: + * + * <pre> + * [key] | [record] + * </pre> + * + * <p>If the key's length is variable, the format of each serialized element is as follows: + * + * <pre> + * [key's length] | [key] | [record] + * </pre> + */ +final class KeyAndValueSerializer<INPUT> extends TypeSerializer<Tuple2<byte[], INPUT>> { + private final TypeSerializer<INPUT> valueSerializer; + + /** + * The value of serializedKeyLength will be positive if the key's length is fixed otherwise -1. + */ + private final int serializedKeyLength; + + KeyAndValueSerializer(TypeSerializer<INPUT> valueSerializer, int serializedKeyLength) { + this.valueSerializer = valueSerializer; + this.serializedKeyLength = serializedKeyLength; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<Tuple2<byte[], INPUT>> duplicate() { + return new KeyAndValueSerializer<>(valueSerializer.duplicate(), this.serializedKeyLength); + } + + @Override + public Tuple2<byte[], INPUT> copy(Tuple2<byte[], INPUT> from) { + INPUT record = from.f1; + return Tuple2.of(Arrays.copyOf(from.f0, from.f0.length), valueSerializer.copy(record)); + } + + @Override + public Tuple2<byte[], INPUT> createInstance() { + return Tuple2.of(new byte[0], valueSerializer.createInstance()); + } + + @Override + public Tuple2<byte[], INPUT> copy(Tuple2<byte[], INPUT> from, Tuple2<byte[], INPUT> reuse) { + reuse.f0 = Arrays.copyOf(from.f0, from.f0.length); + INPUT fromRecord = from.f1; + reuse.f1 = valueSerializer.copy(fromRecord, reuse.f1); + return reuse; + } + + @Override + public int getLength() { + if (valueSerializer.getLength() < 0 || serializedKeyLength < 0) { + return -1; + } + return valueSerializer.getLength() + serializedKeyLength; + } + + @Override + public void serialize(Tuple2<byte[], INPUT> record, DataOutputView target) throws IOException { + if (serializedKeyLength < 0) { + target.writeInt(record.f0.length); + } + target.write(record.f0); + INPUT toSerialize = record.f1; + valueSerializer.serialize(toSerialize, target); + } + + @Override + public Tuple2<byte[], INPUT> deserialize(DataInputView source) throws IOException { + final int length = getKeyLength(source); + byte[] bytes = new byte[length]; + source.read(bytes); + INPUT value = valueSerializer.deserialize(source); + return Tuple2.of(bytes, value); + } + + @Override + public Tuple2<byte[], INPUT> deserialize(Tuple2<byte[], INPUT> reuse, DataInputView source) + throws IOException { + final int length = getKeyLength(source); + byte[] bytes = new byte[length]; + source.read(bytes); + INPUT value = valueSerializer.deserialize(source); + reuse.f0 = bytes; + reuse.f1 = value; + return reuse; + } + + private int getKeyLength(DataInputView source) throws IOException { + final int length; + if (serializedKeyLength < 0) { + length = source.readInt(); + } else { + length = serializedKeyLength; + } + return length; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final int length; + if (serializedKeyLength < 0) { + length = source.readInt(); + target.writeInt(length); + } else { + length = serializedKeyLength; + } + for (int i = 0; i < length; i++) { + target.writeByte(source.readByte()); + } + valueSerializer.copy(source, target); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KeyAndValueSerializer<?> that = (KeyAndValueSerializer<?>) o; + return Objects.equals(valueSerializer, that.valueSerializer); + } + + @Override + public int hashCode() { + return Objects.hash(valueSerializer); + } + + @Override + public TypeSerializerSnapshot<Tuple2<byte[], INPUT>> snapshotConfiguration() { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperator.java new file mode 100644 index 00000000000..8c45a54ed5a --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperator.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; + +/** + * The {@link KeyedSortPartitionOperator} sorts records of a partition on {@link KeyedStream}. It + * ensures that all records with the same key are sorted in a user-defined order. + * + * <p>To sort the record key first and then record at the same time, both the record key and the + * record will be written to {@link ExternalSorter} directly. However, if the record is sorted + * according to the selected key by {@link KeySelector}, the selected sort key should also be + * written with the record key and the record to {@link ExternalSorter} to avoid repeated key + * selections. + * + * @param <INPUT> The type of input record. + * @param <KEY> The type of record key, which has already been defined in {@link KeyedStream}. + */ +@Internal +public class KeyedSortPartitionOperator<INPUT, KEY> extends AbstractStreamOperator<INPUT> + implements OneInputStreamOperator<INPUT, INPUT>, BoundedOneInput { + + /** The type information of input records. */ + protected final TypeInformation<INPUT> inputType; + + /** The selector to create the sort key for records, which will be null if it's not used. */ + protected final KeySelector<INPUT, ?> sortFieldSelector; + + /** The order to sort records. */ + private final Order sortOrder; + + /** + * The string field to indicate the sort key for records with tuple or pojo type, which will be + * null if it's not used. + */ + private final String stringSortField; + + /** + * The int field to indicate the sort key for records with tuple type, which will be -1 if it's + * not used. + */ + private final int positionSortField; + + /** + * The sorter to sort both key and record if the record is not sorted by {@link KeySelector}. + */ + private PushSorter<Tuple2<byte[], INPUT>> recordSorter = null; + + /** The sorter to sort both key and record if the record is sorted by {@link KeySelector}. */ + private PushSorter<Tuple2<byte[], Tuple2<?, INPUT>>> recordSorterForSelector = null; + + private TypeSerializer<KEY> recordKeySerializer; + + /** A buffer to save the serialized record key. */ + private DataOutputSerializer dataOutputSerializer; + + public KeyedSortPartitionOperator( + TypeInformation<INPUT> inputType, int positionSortField, Order sortOrder) { + this.inputType = inputType; + ensureFieldSortable(positionSortField); + this.positionSortField = positionSortField; + this.stringSortField = null; + this.sortFieldSelector = null; + this.sortOrder = sortOrder; + } + + public KeyedSortPartitionOperator( + TypeInformation<INPUT> inputType, String stringSortField, Order sortOrder) { + this.inputType = inputType; + ensureFieldSortable(stringSortField); + this.positionSortField = -1; + this.stringSortField = stringSortField; + this.sortFieldSelector = null; + this.sortOrder = sortOrder; + } + + public <K> KeyedSortPartitionOperator( + TypeInformation<INPUT> inputType, + KeySelector<INPUT, K> sortFieldSelector, + Order sortOrder) { + this.inputType = inputType; + ensureFieldSortable(sortFieldSelector); + this.positionSortField = -1; + this.stringSortField = null; + this.sortFieldSelector = sortFieldSelector; + this.sortOrder = sortOrder; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<INPUT>> output) { + super.setup(containingTask, config, output); + ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader(); + ExecutionConfig executionConfig = containingTask.getEnvironment().getExecutionConfig(); + recordKeySerializer = config.getStateKeySerializer(userCodeClassLoader); + int keyLength = recordKeySerializer.getLength(); + createDataOutputSerializer(keyLength); + if (sortFieldSelector != null) { + TypeInformation<Tuple2<?, INPUT>> valueType = + Types.TUPLE( + TypeExtractor.getKeySelectorTypes(sortFieldSelector, inputType), + inputType); + KeyAndValueSerializer<Tuple2<?, INPUT>> valueSerializer = + new KeyAndValueSerializer<>( + valueType.createSerializer(getExecutionConfig()), keyLength); + TypeComparator<Tuple2<byte[], Tuple2<?, INPUT>>> sortTypeComparator; + if (keyLength > 0) { + sortTypeComparator = + new FixedLengthByteKeyAndValueComparator<>( + keyLength, + ((CompositeType<Tuple2<?, INPUT>>) valueType) + .createComparator( + getSortFieldIndex(), + getSortOrderIndicator(), + 0, + executionConfig)); + } else { + sortTypeComparator = + new VariableLengthByteKeyAndValueComparator<>( + ((CompositeType<Tuple2<?, INPUT>>) valueType) + .createComparator( + getSortFieldIndex(), + getSortOrderIndicator(), + 0, + executionConfig)); + } + recordSorterForSelector = + getSorter(valueSerializer, sortTypeComparator, containingTask); + } else { + KeyAndValueSerializer<INPUT> valueSerializer = + new KeyAndValueSerializer<>( + inputType.createSerializer(getExecutionConfig()), keyLength); + TypeComparator<Tuple2<byte[], INPUT>> sortTypeComparator; + if (keyLength > 0) { + sortTypeComparator = + new FixedLengthByteKeyAndValueComparator<>( + keyLength, + ((CompositeType<INPUT>) inputType) + .createComparator( + getSortFieldIndex(), + getSortOrderIndicator(), + 0, + executionConfig)); + } else { + sortTypeComparator = + new VariableLengthByteKeyAndValueComparator<>( + ((CompositeType<INPUT>) inputType) + .createComparator( + getSortFieldIndex(), + getSortOrderIndicator(), + 0, + executionConfig)); + } + recordSorter = getSorter(valueSerializer, sortTypeComparator, containingTask); + } + } + + @Override + public void processElement(StreamRecord<INPUT> element) throws Exception { + KEY currentKey = (KEY) getCurrentKey(); + recordKeySerializer.serialize(currentKey, dataOutputSerializer); + byte[] serializedKey = dataOutputSerializer.getCopyOfBuffer(); + dataOutputSerializer.clear(); + if (sortFieldSelector != null) { + recordSorterForSelector.writeRecord( + Tuple2.of( + serializedKey, + Tuple2.of( + sortFieldSelector.getKey(element.getValue()), + element.getValue()))); + } else { + recordSorter.writeRecord(Tuple2.of(serializedKey, element.getValue())); + } + } + + @Override + public void endInput() throws Exception { + TimestampedCollector<INPUT> outputCollector = new TimestampedCollector<>(output); + if (sortFieldSelector != null) { + recordSorterForSelector.finishReading(); + MutableObjectIterator<Tuple2<byte[], Tuple2<?, INPUT>>> iterator = + recordSorterForSelector.getIterator(); + Tuple2<byte[], Tuple2<?, INPUT>> record = iterator.next(); + while (record != null) { + outputCollector.collect(record.f1.f1); + record = iterator.next(); + } + recordSorterForSelector.close(); + } else { + recordSorter.finishReading(); + MutableObjectIterator<Tuple2<byte[], INPUT>> iterator = recordSorter.getIterator(); + Tuple2<byte[], INPUT> record = iterator.next(); + while (record != null) { + outputCollector.collect(record.f1); + record = iterator.next(); + } + recordSorter.close(); + } + } + + @Override + public OperatorAttributes getOperatorAttributes() { + return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build(); + } + + /** + * Get the sort field index for the sorted data. + * + * @return the sort field index. + */ + private int[] getSortFieldIndex() { + int[] sortFieldIndex = new int[1]; + if (positionSortField != -1) { + sortFieldIndex[0] = + new Keys.ExpressionKeys<>(positionSortField, inputType) + .computeLogicalKeyPositions()[0]; + } else if (stringSortField != null) { + sortFieldIndex[0] = + new Keys.ExpressionKeys<>(stringSortField, inputType) + .computeLogicalKeyPositions()[0]; + } + return sortFieldIndex; + } + + /** + * Get the indicator for the sort order. + * + * @return sort order indicator. + */ + private boolean[] getSortOrderIndicator() { + boolean[] sortOrderIndicator = new boolean[1]; + sortOrderIndicator[0] = this.sortOrder == Order.ASCENDING; + return sortOrderIndicator; + } + + private void ensureFieldSortable(int field) throws InvalidProgramException { + if (!Keys.ExpressionKeys.isSortKey(field, inputType)) { + throw new InvalidProgramException( + "The field " + field + " of input type " + inputType + " is not sortable."); + } + } + + private void ensureFieldSortable(String field) throws InvalidProgramException { + if (!Keys.ExpressionKeys.isSortKey(field, inputType)) { + throw new InvalidProgramException( + "The field " + field + " of input type " + inputType + " is not sortable."); + } + } + + private <K> void ensureFieldSortable(KeySelector<INPUT, K> keySelector) { + TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, inputType); + Keys.SelectorFunctionKeys<INPUT, K> sortKey = + new Keys.SelectorFunctionKeys<>(keySelector, inputType, keyType); + if (!sortKey.getKeyType().isSortKeyType()) { + throw new InvalidProgramException("The key type " + keyType + " is not sortable."); + } + } + + /** + * Create the dataOutputSerializer to save the serialized record key as a buffer. + * + * @param keyLength the length of record key. The key length will be variable if the value is + * -1. + */ + private void createDataOutputSerializer(int keyLength) { + if (keyLength > 0) { + dataOutputSerializer = new DataOutputSerializer(keyLength); + } else { + // The initial buffer size is set to 64. The buffer will expand size if it's needed. + dataOutputSerializer = new DataOutputSerializer(64); + } + } + + private <TYPE> PushSorter<TYPE> getSorter( + TypeSerializer<TYPE> typeSerializer, + TypeComparator<TYPE> typeComparator, + StreamTask<?, ?> streamTask) { + ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader(); + Configuration jobConfiguration = streamTask.getEnvironment().getJobConfiguration(); + double managedMemoryFraction = + config.getManagedMemoryFractionOperatorUseCaseOfSlot( + ManagedMemoryUseCase.OPERATOR, + streamTask.getEnvironment().getJobConfiguration(), + streamTask.getEnvironment().getTaskConfiguration(), + userCodeClassLoader); + try { + return ExternalSorter.newBuilder( + streamTask.getEnvironment().getMemoryManager(), + streamTask, + typeSerializer, + typeComparator, + streamTask.getExecutionConfig()) + .memoryFraction(managedMemoryFraction) + .enableSpilling( + streamTask.getEnvironment().getIOManager(), + jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)) + .objectReuse(streamTask.getExecutionConfig().isObjectReuseEnabled()) + .largeRecords(jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java new file mode 100644 index 00000000000..11b41cc48ba --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; + +/** + * The {@link SortPartitionOperator} sorts records of a partition on non-keyed data stream. It + * ensures that all records within the same task are sorted in a user-defined order. + * + * <p>To sort the records, the record will be written to {@link ExternalSorter} directly. However, + * if the record is sorted according to the selected key by {@link KeySelector}, the selected key + * should also be written with the record to {@link ExternalSorter} to avoid repeated key + * selections. + * + * @param <INPUT> The type of input record. + */ +@Internal +public class SortPartitionOperator<INPUT> extends AbstractStreamOperator<INPUT> + implements OneInputStreamOperator<INPUT, INPUT>, BoundedOneInput { + + /** The type information of input records. */ + protected final TypeInformation<INPUT> inputType; + + /** The selector to create the sort key for records, which will be null if it's not used. */ + protected final KeySelector<INPUT, ?> sortFieldSelector; + + /** The order to sort records. */ + private final Order sortOrder; + + /** + * The string field to indicate the sort key for records with tuple or pojo type, which will be + * null if it's not used. + */ + private final String stringSortField; + + /** + * The int field to indicate the sort key for records with tuple type, which will be -1 if it's + * not used. + */ + private final int positionSortField; + + /** The sorter to sort record if the record is not sorted by {@link KeySelector}. */ + private PushSorter<INPUT> recordSorter = null; + + /** The sorter to sort record if the record is sorted by {@link KeySelector}. */ + private PushSorter<Tuple2<?, INPUT>> recordSorterForKeySelector = null; + + public SortPartitionOperator( + TypeInformation<INPUT> inputType, int positionSortField, Order sortOrder) { + this.inputType = inputType; + ensureFieldSortable(positionSortField); + this.positionSortField = positionSortField; + this.stringSortField = null; + this.sortFieldSelector = null; + this.sortOrder = sortOrder; + } + + public SortPartitionOperator( + TypeInformation<INPUT> inputType, String stringSortField, Order sortOrder) { + this.inputType = inputType; + ensureFieldSortable(stringSortField); + this.positionSortField = -1; + this.stringSortField = stringSortField; + this.sortFieldSelector = null; + this.sortOrder = sortOrder; + } + + public <K> SortPartitionOperator( + TypeInformation<INPUT> inputType, + KeySelector<INPUT, K> sortFieldSelector, + Order sortOrder) { + this.inputType = inputType; + ensureFieldSortable(sortFieldSelector); + this.positionSortField = -1; + this.stringSortField = null; + this.sortFieldSelector = sortFieldSelector; + this.sortOrder = sortOrder; + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<INPUT>> output) { + super.setup(containingTask, config, output); + ExecutionConfig executionConfig = containingTask.getEnvironment().getExecutionConfig(); + if (sortFieldSelector != null) { + TypeInformation<Tuple2<?, INPUT>> sortTypeInfo = + Types.TUPLE( + TypeExtractor.getKeySelectorTypes(sortFieldSelector, inputType), + inputType); + recordSorterForKeySelector = + getSorter( + sortTypeInfo.createSerializer(executionConfig), + ((CompositeType<Tuple2<?, INPUT>>) sortTypeInfo) + .createComparator( + getSortFieldIndex(), + getSortOrderIndicator(), + 0, + executionConfig), + containingTask); + } else { + recordSorter = + getSorter( + inputType.createSerializer(executionConfig), + ((CompositeType<INPUT>) inputType) + .createComparator( + getSortFieldIndex(), + getSortOrderIndicator(), + 0, + executionConfig), + containingTask); + } + } + + @Override + public void processElement(StreamRecord<INPUT> element) throws Exception { + if (sortFieldSelector != null) { + recordSorterForKeySelector.writeRecord( + Tuple2.of(sortFieldSelector.getKey(element.getValue()), element.getValue())); + } else { + recordSorter.writeRecord(element.getValue()); + } + } + + @Override + public void endInput() throws Exception { + TimestampedCollector<INPUT> outputCollector = new TimestampedCollector<>(output); + if (sortFieldSelector != null) { + recordSorterForKeySelector.finishReading(); + MutableObjectIterator<Tuple2<?, INPUT>> dataIterator = + recordSorterForKeySelector.getIterator(); + Tuple2<?, INPUT> record = dataIterator.next(); + while (record != null) { + outputCollector.collect(record.f1); + record = dataIterator.next(); + } + recordSorterForKeySelector.close(); + } else { + recordSorter.finishReading(); + MutableObjectIterator<INPUT> dataIterator = recordSorter.getIterator(); + INPUT record = dataIterator.next(); + while (record != null) { + outputCollector.collect(record); + record = dataIterator.next(); + } + recordSorter.close(); + } + } + + @Override + public OperatorAttributes getOperatorAttributes() { + return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build(); + } + + /** + * Get the sort field index for the sorted data. + * + * @return the sort field index. + */ + private int[] getSortFieldIndex() { + int[] sortFieldIndex = new int[1]; + if (positionSortField != -1) { + sortFieldIndex[0] = + new Keys.ExpressionKeys<>(positionSortField, inputType) + .computeLogicalKeyPositions()[0]; + } else if (stringSortField != null) { + sortFieldIndex[0] = + new Keys.ExpressionKeys<>(stringSortField, inputType) + .computeLogicalKeyPositions()[0]; + } + return sortFieldIndex; + } + + /** + * Get the indicator for the sort order. + * + * @return sort order indicator. + */ + private boolean[] getSortOrderIndicator() { + boolean[] sortOrderIndicator = new boolean[1]; + sortOrderIndicator[0] = this.sortOrder == Order.ASCENDING; + return sortOrderIndicator; + } + + private void ensureFieldSortable(int field) throws InvalidProgramException { + if (!Keys.ExpressionKeys.isSortKey(field, inputType)) { + throw new InvalidProgramException( + "The field " + field + " of input type " + inputType + " is not sortable."); + } + } + + private void ensureFieldSortable(String field) throws InvalidProgramException { + if (!Keys.ExpressionKeys.isSortKey(field, inputType)) { + throw new InvalidProgramException( + "The field " + field + " of input type " + inputType + " is not sortable."); + } + } + + private <K> void ensureFieldSortable(KeySelector<INPUT, K> keySelector) { + TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, inputType); + Keys.SelectorFunctionKeys<INPUT, K> sortKey = + new Keys.SelectorFunctionKeys<>(keySelector, inputType, keyType); + if (!sortKey.getKeyType().isSortKeyType()) { + throw new InvalidProgramException("The key type " + keyType + " is not sortable."); + } + } + + private <TYPE> PushSorter<TYPE> getSorter( + TypeSerializer<TYPE> typeSerializer, + TypeComparator<TYPE> typeComparator, + StreamTask<?, ?> streamTask) { + ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader(); + Configuration jobConfiguration = streamTask.getEnvironment().getJobConfiguration(); + double managedMemoryFraction = + config.getManagedMemoryFractionOperatorUseCaseOfSlot( + ManagedMemoryUseCase.OPERATOR, + streamTask.getEnvironment().getJobConfiguration(), + streamTask.getEnvironment().getTaskConfiguration(), + userCodeClassLoader); + try { + return ExternalSorter.newBuilder( + streamTask.getEnvironment().getMemoryManager(), + streamTask, + typeSerializer, + typeComparator, + streamTask.getExecutionConfig()) + .memoryFraction(managedMemoryFraction) + .enableSpilling( + streamTask.getEnvironment().getIOManager(), + jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)) + .objectReuse(streamTask.getExecutionConfig().isObjectReuseEnabled()) + .largeRecords(jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparator.java new file mode 100644 index 00000000000..98c648f6f40 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparator.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.IOException; +import java.util.Arrays; + +/** + * The {@link VariableLengthByteKeyAndValueComparator} is used by {@link KeyedSortPartitionOperator} + * to compare records according to both the record key and record value. The length of record key + * must be variable and will be read from {@link DataInputView}. + */ +@Internal +public class VariableLengthByteKeyAndValueComparator<INPUT> + extends TypeComparator<Tuple2<byte[], INPUT>> { + + private final TypeComparator<INPUT> valueComparator; + + private byte[] keyReference; + + private INPUT valueReference; + + public VariableLengthByteKeyAndValueComparator(TypeComparator<INPUT> valueComparator) { + this.valueComparator = valueComparator; + } + + @Override + public int hash(Tuple2<byte[], INPUT> record) { + return record.hashCode(); + } + + @Override + public void setReference(Tuple2<byte[], INPUT> toCompare) { + this.keyReference = toCompare.f0; + this.valueReference = toCompare.f1; + } + + @Override + public boolean equalToReference(Tuple2<byte[], INPUT> candidate) { + return Arrays.equals(keyReference, candidate.f0) && valueReference.equals(candidate.f1); + } + + @Override + public int compareToReference(TypeComparator<Tuple2<byte[], INPUT>> referencedComparator) { + byte[] otherKey = + ((VariableLengthByteKeyAndValueComparator<INPUT>) referencedComparator) + .keyReference; + INPUT otherValue = + ((VariableLengthByteKeyAndValueComparator<INPUT>) referencedComparator) + .valueReference; + int keyCmp = compareKey(otherKey, this.keyReference); + if (keyCmp != 0) { + return keyCmp; + } + return valueComparator.compare(this.valueReference, otherValue); + } + + @Override + public int compare(Tuple2<byte[], INPUT> first, Tuple2<byte[], INPUT> second) { + int keyCmp = compareKey(first.f0, second.f0); + if (keyCmp != 0) { + return keyCmp; + } + return valueComparator.compare(first.f1, second.f1); + } + + private int compareKey(byte[] first, byte[] second) { + int firstLength = first.length; + int secondLength = second.length; + int minLength = Math.min(firstLength, secondLength); + for (int i = 0; i < minLength; i++) { + int cmp = Byte.compare(first[i], second[i]); + + if (cmp != 0) { + return cmp; + } + } + + return Integer.compare(firstLength, secondLength); + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) + throws IOException { + int firstLength = firstSource.readInt(); + int secondLength = secondSource.readInt(); + int minLength = Math.min(firstLength, secondLength); + while (minLength-- > 0) { + byte firstValue = firstSource.readByte(); + byte secondValue = secondSource.readByte(); + + int cmp = Byte.compare(firstValue, secondValue); + if (cmp != 0) { + return cmp; + } + } + int lengthCompare = Integer.compare(firstLength, secondLength); + if (lengthCompare != 0) { + return lengthCompare; + } else { + return valueComparator.compareSerialized(firstSource, secondSource); + } + } + + @Override + public boolean supportsNormalizedKey() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return false; + } + + @Override + public void putNormalizedKey( + Tuple2<byte[], INPUT> record, MemorySegment target, int offset, int numBytes) { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public TypeComparator<Tuple2<byte[], INPUT>> duplicate() { + return new VariableLengthByteKeyAndValueComparator<>(this.valueComparator); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator<?>[] getFlatComparators() { + return new TypeComparator[] {this}; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(Tuple2<byte[], INPUT> record, DataOutputView target) { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } + + @Override + public Tuple2<byte[], INPUT> readWithKeyDenormalization( + Tuple2<byte[], INPUT> reuse, DataInputView source) { + throw new UnsupportedOperationException( + "Not supported as the data containing both key and value cannot generate normalized key."); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java index beecf931024..804d1f7c09a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java @@ -22,6 +22,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.TransformationTranslator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.sortpartition.KeyedSortPartitionOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import java.util.Collection; @@ -78,6 +81,15 @@ public final class OneInputTransformationTranslator<IN, OUT> final OneInputTransformation<IN, OUT> transformation, final Context context) { KeySelector<IN, ?> keySelector = transformation.getStateKeySelector(); if (keySelector != null) { + // KeyedSortPartitionOperator doesn't need sorted input because it sorts data + // internally. + StreamOperatorFactory<OUT> operatorFactory = transformation.getOperatorFactory(); + if (operatorFactory instanceof SimpleOperatorFactory + && operatorFactory.getStreamOperatorClass( + Thread.currentThread().getContextClassLoader()) + == KeyedSortPartitionOperator.class) { + return; + } BatchExecutionUtils.applyBatchExecutionSettings( transformation.getId(), context, StreamConfig.InputRequirement.SORTED); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparatorTest.java new file mode 100644 index 00000000000..a7065a1e2ad --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/FixedLengthByteKeyAndValueComparatorTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FixedLengthByteKeyAndValueComparator} in {@link KeyedSortPartitionOperator}. */ +class FixedLengthByteKeyAndValueComparatorTest extends ComparatorTestBase<Tuple2<byte[], Integer>> { + + @Override + protected Order[] getTestedOrder() { + return new Order[] {Order.ASCENDING}; + } + + @Override + protected TypeComparator<Tuple2<byte[], Integer>> createComparator(boolean ascending) { + return new FixedLengthByteKeyAndValueComparator<>( + new IntSerializer().getLength(), + BasicTypeInfo.INT_TYPE_INFO.createComparator(true, null)); + } + + @Override + protected TypeSerializer<Tuple2<byte[], Integer>> createSerializer() { + IntSerializer intSerializer = new IntSerializer(); + return new KeyAndValueSerializer<>(intSerializer, intSerializer.getLength()); + } + + @Override + protected void deepEquals( + String message, Tuple2<byte[], Integer> should, Tuple2<byte[], Integer> is) { + assertThat(is.f0).as(message).isEqualTo(should.f0); + assertThat(is.f1).as(message).isEqualTo(should.f1); + } + + @Override + protected Tuple2<byte[], Integer>[] getSortedTestData() { + return SerializerComparatorTestData.getOrderedIntTestData(); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializerTest.java new file mode 100644 index 00000000000..879468e9f90 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyAndValueSerializerTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link KeyAndValueSerializer} in {@link KeyedSortPartitionOperator}. */ +class KeyAndValueSerializerTest extends SerializerTestBase<Tuple2<byte[], Integer>> { + + private static final int DEFAULT_KEY_LENGTH = 4; + + private final IntSerializer valueSerializer = new IntSerializer(); + + @Override + protected TypeSerializer<Tuple2<byte[], Integer>> createSerializer() { + return new KeyAndValueSerializer<>(valueSerializer, DEFAULT_KEY_LENGTH); + } + + @Override + protected int getLength() { + return DEFAULT_KEY_LENGTH + valueSerializer.getLength(); + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + protected Class<Tuple2<byte[], Integer>> getTypeClass() { + return (Class<Tuple2<byte[], Integer>>) (Class) Tuple2.class; + } + + @Override + protected Tuple2<byte[], Integer>[] getTestData() { + return SerializerComparatorTestData.getOrderedIntTestData(); + } + + @Override + @Test + public void testConfigSnapshotInstantiation() { + assertThatThrownBy(() -> super.testConfigSnapshotInstantiation()) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Override + @Test + public void testSnapshotConfigurationAndReconfigure() { + assertThatThrownBy(() -> super.testSnapshotConfigurationAndReconfigure()) + .isInstanceOf(UnsupportedOperationException.class); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperatorTest.java new file mode 100644 index 00000000000..470381ecb46 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/KeyedSortPartitionOperatorTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; + +import org.junit.jupiter.api.Test; + +import java.io.Serializable; +import java.util.LinkedList; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link KeyedSortPartitionOperator}. */ +class KeyedSortPartitionOperatorTest { + + @Test + void testSortPartition() throws Exception { + // 1.Test KeyedSortPartitionOperator sorting records by position field. + KeyedSortPartitionOperator<Tuple2<Integer, String>, String> operator1 = + createSortPartitionOperatorWithPositionField(); + OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Tuple2<Integer, String>> + testHarness1 = new OneInputStreamOperatorTestHarness<>(operator1); + StreamConfig streamConfig1 = testHarness1.getStreamConfig(); + streamConfig1.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig) null)); + streamConfig1.serializeAllConfigs(); + Queue<Object> expectedOutput1 = new LinkedList<>(); + testHarness1.open(); + testHarness1.processElement(new StreamRecord<>(Tuple2.of(3, "3"))); + testHarness1.processElement(new StreamRecord<>(Tuple2.of(1, "1"))); + testHarness1.endInput(); + testHarness1.close(); + expectedOutput1.add(new StreamRecord<>(Tuple2.of(1, "1"))); + expectedOutput1.add(new StreamRecord<>(Tuple2.of(3, "3"))); + TestHarnessUtil.assertOutputEquals( + "The sort partition result is not correct.", + expectedOutput1, + testHarness1.getOutput()); + // 2.Test KeyedSortPartitionOperator sorting records by string field. + KeyedSortPartitionOperator<TestPojo, String> operator2 = + createSortPartitionOperatorWithStringField(); + OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness2 = + new OneInputStreamOperatorTestHarness<>(operator2); + StreamConfig streamConfig2 = testHarness2.getStreamConfig(); + streamConfig2.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig) null)); + streamConfig2.serializeAllConfigs(); + Queue<Object> expectedOutput2 = new LinkedList<>(); + testHarness2.open(); + testHarness2.processElement(new StreamRecord<>(new TestPojo("3", 3))); + testHarness2.processElement(new StreamRecord<>(new TestPojo("1", 1))); + testHarness2.endInput(); + testHarness2.close(); + expectedOutput2.add(new StreamRecord<>(new SortPartitionOperatorTest.TestPojo("1", 1))); + expectedOutput2.add(new StreamRecord<>(new SortPartitionOperatorTest.TestPojo("3", 3))); + TestHarnessUtil.assertOutputEquals( + "The sort partition result is not correct.", + expectedOutput2, + testHarness2.getOutput()); + // 3.Test KeyedSortPartitionOperator sorting records by key selector. + KeyedSortPartitionOperator<TestPojo, String> operator3 = + createSortPartitionOperatorWithKeySelector(); + OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness3 = + new OneInputStreamOperatorTestHarness<>(operator3); + StreamConfig streamConfig3 = testHarness3.getStreamConfig(); + streamConfig3.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig) null)); + streamConfig3.serializeAllConfigs(); + Queue<Object> expectedOutput3 = new LinkedList<>(); + testHarness3.open(); + testHarness3.processElement(new StreamRecord<>(new TestPojo("3", 3))); + testHarness3.processElement(new StreamRecord<>(new TestPojo("1", 1))); + testHarness3.endInput(); + testHarness3.close(); + expectedOutput3.add(new StreamRecord<>(new SortPartitionOperatorTest.TestPojo("1", 1))); + expectedOutput3.add(new StreamRecord<>(new SortPartitionOperatorTest.TestPojo("3", 3))); + TestHarnessUtil.assertOutputEquals( + "The sort partition result is not correct.", + expectedOutput3, + testHarness3.getOutput()); + } + + @Test + void testOpenClose() throws Exception { + KeyedSortPartitionOperator<Tuple2<Integer, String>, String> sortPartitionOperator = + createSortPartitionOperatorWithPositionField(); + OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Tuple2<Integer, String>> + testHarness = new OneInputStreamOperatorTestHarness<>(sortPartitionOperator); + StreamConfig streamConfig = testHarness.getStreamConfig(); + streamConfig.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig) null)); + streamConfig.serializeAllConfigs(); + testHarness.open(); + testHarness.processElement(new StreamRecord<>(Tuple2.of(1, "1"))); + testHarness.endInput(); + testHarness.close(); + assertThat(testHarness.getOutput()).isNotEmpty(); + } + + private KeyedSortPartitionOperator<Tuple2<Integer, String>, String> + createSortPartitionOperatorWithPositionField() { + TypeInformation<Tuple2<Integer, String>> inputType = + Types.TUPLE(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + int positionSortField = 0; + Order sortOrder = Order.ASCENDING; + return new KeyedSortPartitionOperator<>(inputType, positionSortField, sortOrder); + } + + private KeyedSortPartitionOperator<TestPojo, String> + createSortPartitionOperatorWithStringField() { + TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class); + String positionSortField = "value"; + Order sortOrder = Order.ASCENDING; + return new KeyedSortPartitionOperator<>(inputType, positionSortField, sortOrder); + } + + private KeyedSortPartitionOperator<TestPojo, String> + createSortPartitionOperatorWithKeySelector() { + TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class); + Order sortOrder = Order.ASCENDING; + return new KeyedSortPartitionOperator<>(inputType, TestPojo::getValue, sortOrder); + } + + /** The test pojo. */ + public static class TestPojo implements Serializable { + + public String key; + + public Integer value; + + public TestPojo() {} + + public TestPojo(String key, Integer value) { + this.key = key; + this.value = value; + } + + public Integer getValue() { + return value; + } + + public void setValue(Integer value) { + this.value = value; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + @Override + public boolean equals(Object object) { + if (object instanceof SortPartitionOperatorTest.TestPojo) { + SortPartitionOperatorTest.TestPojo testPojo = + (SortPartitionOperatorTest.TestPojo) object; + return testPojo.getKey().equals(getKey()) && testPojo.getValue().equals(getValue()); + } + return false; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SerializerComparatorTestData.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SerializerComparatorTestData.java new file mode 100644 index 00000000000..36dd1d45949 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SerializerComparatorTestData.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataOutputSerializer; + +import java.io.IOException; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** Test data for serializers and comparators in {@link KeyedSortPartitionOperator}. */ +class SerializerComparatorTestData { + + @SuppressWarnings("unchecked") + static Tuple2<byte[], Integer>[] getOrderedIntTestData() { + IntSerializer intSerializer = new IntSerializer(); + DataOutputSerializer outputSerializer = new DataOutputSerializer(intSerializer.getLength()); + + return IntStream.range(-10, 10) + .mapToObj( + idx -> { + try { + intSerializer.serialize(idx, outputSerializer); + byte[] copyOfBuffer = outputSerializer.getCopyOfBuffer(); + outputSerializer.clear(); + return Tuple2.of(copyOfBuffer, idx); + } catch (IOException e) { + throw new AssertionError(e); + } + }) + .toArray(Tuple2[]::new); + } + + @SuppressWarnings("unchecked") + static Tuple2<byte[], String>[] getOrderedStringTestData() { + StringSerializer stringSerializer = new StringSerializer(); + DataOutputSerializer outputSerializer = new DataOutputSerializer(64); + return Stream.of( + new String(new byte[] {-1, 0}), + new String(new byte[] {0, 1}), + "A", + "AB", + "ABC", + "ABCD", + "ABCDE", + "ABCDEF", + "ABCDEFG", + "ABCDEFGH") + .map( + str -> { + try { + stringSerializer.serialize(str, outputSerializer); + byte[] copyOfBuffer = outputSerializer.getCopyOfBuffer(); + outputSerializer.clear(); + return Tuple2.of(copyOfBuffer, str); + } catch (IOException e) { + throw new AssertionError(e); + } + }) + .sorted( + (o1, o2) -> { + byte[] key0 = o1.f0; + byte[] key1 = o2.f0; + + int firstLength = key0.length; + int secondLength = key1.length; + int minLength = Math.min(firstLength, secondLength); + for (int i = 0; i < minLength; i++) { + int cmp = Byte.compare(key0[i], key1[i]); + if (cmp != 0) { + return cmp; + } + } + int lengthCmp = Integer.compare(firstLength, secondLength); + if (lengthCmp != 0) { + return lengthCmp; + } + return o1.f1.compareTo(o2.f1); + }) + .toArray(Tuple2[]::new); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java new file mode 100644 index 00000000000..e0d35273872 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; + +import org.junit.jupiter.api.Test; + +import java.io.Serializable; +import java.util.LinkedList; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link SortPartitionOperator}. */ +class SortPartitionOperatorTest { + + @Test + void testSortPartition() throws Exception { + // 1.Test SortPartitionOperator sorting records by position field. + SortPartitionOperator<Tuple2<Integer, String>> operator1 = + createSortPartitionOperatorWithPositionField(); + OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Tuple2<Integer, String>> + testHarness1 = new OneInputStreamOperatorTestHarness<>(operator1); + Queue<Object> expectedOutput1 = new LinkedList<>(); + testHarness1.setup(); + testHarness1.processElement(new StreamRecord<>(Tuple2.of(3, "3"))); + testHarness1.processElement(new StreamRecord<>(Tuple2.of(1, "1"))); + testHarness1.endInput(); + testHarness1.close(); + expectedOutput1.add(new StreamRecord<>(Tuple2.of(1, "1"))); + expectedOutput1.add(new StreamRecord<>(Tuple2.of(3, "3"))); + TestHarnessUtil.assertOutputEquals( + "The sort partition result is not correct.", + expectedOutput1, + testHarness1.getOutput()); + // 2.Test SortPartitionOperator sorting records by string field. + SortPartitionOperator<TestPojo> operator2 = createSortPartitionOperatorWithStringField(); + OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness2 = + new OneInputStreamOperatorTestHarness<>(operator2); + Queue<Object> expectedOutput2 = new LinkedList<>(); + testHarness2.setup(); + testHarness2.processElement(new StreamRecord<>(new TestPojo("3", 3))); + testHarness2.processElement(new StreamRecord<>(new TestPojo("1", 1))); + testHarness2.endInput(); + testHarness2.close(); + expectedOutput2.add(new StreamRecord<>(new TestPojo("1", 1))); + expectedOutput2.add(new StreamRecord<>(new TestPojo("3", 3))); + TestHarnessUtil.assertOutputEquals( + "The sort partition result is not correct.", + expectedOutput2, + testHarness2.getOutput()); + // 3.Test SortPartitionOperator sorting records by key selector. + SortPartitionOperator<TestPojo> operator3 = createSortPartitionOperatorWithKeySelector(); + OneInputStreamOperatorTestHarness<TestPojo, TestPojo> testHarness3 = + new OneInputStreamOperatorTestHarness<>(operator3); + Queue<Object> expectedOutput3 = new LinkedList<>(); + testHarness3.setup(); + testHarness3.processElement(new StreamRecord<>(new TestPojo("3", 3))); + testHarness3.processElement(new StreamRecord<>(new TestPojo("1", 1))); + testHarness3.endInput(); + testHarness3.close(); + expectedOutput3.add(new StreamRecord<>(new TestPojo("1", 1))); + expectedOutput3.add(new StreamRecord<>(new TestPojo("3", 3))); + TestHarnessUtil.assertOutputEquals( + "The sort partition result is not correct.", + expectedOutput3, + testHarness3.getOutput()); + } + + @Test + void testOpenClose() throws Exception { + SortPartitionOperator<Tuple2<Integer, String>> sortPartitionOperator = + createSortPartitionOperatorWithPositionField(); + OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Tuple2<Integer, String>> + testHarness = new OneInputStreamOperatorTestHarness<>(sortPartitionOperator); + testHarness.open(); + testHarness.processElement(new StreamRecord<>(Tuple2.of(1, "1"))); + testHarness.endInput(); + testHarness.close(); + assertThat(testHarness.getOutput()).isNotEmpty(); + } + + private SortPartitionOperator<Tuple2<Integer, String>> + createSortPartitionOperatorWithPositionField() { + TypeInformation<Tuple2<Integer, String>> inputType = + Types.TUPLE(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + int positionSortField = 0; + Order sortOrder = Order.ASCENDING; + return new SortPartitionOperator<>(inputType, positionSortField, sortOrder); + } + + private SortPartitionOperator<TestPojo> createSortPartitionOperatorWithStringField() { + TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class); + String positionSortField = "value"; + Order sortOrder = Order.ASCENDING; + return new SortPartitionOperator<>(inputType, positionSortField, sortOrder); + } + + private SortPartitionOperator<TestPojo> createSortPartitionOperatorWithKeySelector() { + TypeInformation<TestPojo> inputType = Types.POJO(TestPojo.class); + Order sortOrder = Order.ASCENDING; + return new SortPartitionOperator<>(inputType, TestPojo::getValue, sortOrder); + } + + /** The test pojo. */ + public static class TestPojo implements Serializable { + + public String key; + + public Integer value; + + public TestPojo() {} + + public TestPojo(String key, Integer value) { + this.key = key; + this.value = value; + } + + public Integer getValue() { + return value; + } + + public void setValue(Integer value) { + this.value = value; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + @Override + public boolean equals(Object object) { + if (object instanceof TestPojo) { + TestPojo testPojo = (TestPojo) object; + return testPojo.getKey().equals(getKey()) && testPojo.getValue().equals(getValue()); + } + return false; + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparatorTest.java new file mode 100644 index 00000000000..ecd82815b53 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/VariableLengthByteKeyAndValueComparatorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sortpartition; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link VariableLengthByteKeyAndValueComparator} in {@link KeyedSortPartitionOperator}. + */ +class VariableLengthByteKeyAndValueComparatorTest + extends ComparatorTestBase<Tuple2<byte[], String>> { + @Override + protected Order[] getTestedOrder() { + return new Order[] {Order.ASCENDING}; + } + + @Override + protected TypeComparator<Tuple2<byte[], String>> createComparator(boolean ascending) { + return new VariableLengthByteKeyAndValueComparator<>( + BasicTypeInfo.STRING_TYPE_INFO.createComparator(ascending, null)); + } + + @Override + protected TypeSerializer<Tuple2<byte[], String>> createSerializer() { + StringSerializer stringSerializer = new StringSerializer(); + return new KeyAndValueSerializer<>(stringSerializer, stringSerializer.getLength()); + } + + @Override + protected void deepEquals( + String message, Tuple2<byte[], String> should, Tuple2<byte[], String> is) { + assertThat(is.f0).as(message).isEqualTo(should.f0); + assertThat(is.f1).as(message).isEqualTo(should.f1); + } + + @Override + protected Tuple2<byte[], String>[] getSortedTestData() { + return SerializerComparatorTestData.getOrderedStringTestData(); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java index f71b60a08a8..2da74701adb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java @@ -261,7 +261,10 @@ public class TypeSerializerTestCoverageTest extends TestLogger { DecimalDataSerializer.class.getName(), SharedBufferNode.SharedBufferNodeSerializer.class.getName(), NFA.NFASerializer.class.getName(), - AvroSerializer.class.getName()); + AvroSerializer.class.getName(), + // KeyAndValueSerializer shouldn't be used to serialize data to state and + // doesn't need to ensure upgrade compatibility. + "org.apache.flink.streaming.api.operators.sortpartition.KeyAndValueSerializer"); // check if a test exists for each type serializer for (Class<? extends TypeSerializer> typeSerializer : typeSerializers) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java index c51de02d9fb..ac2425594a7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -210,6 +211,240 @@ class KeyedPartitionWindowedStreamITCase { expectInAnyOrder(resultIterator, "97", "97", "97"); } + @Test + void testSortPartitionOfTupleElementsAscending() throws Exception { + expectInAnyOrder( + sortPartitionOfTupleElementsInOrder(Order.ASCENDING), + "0 1 3 7 ", + "0 1 79 100 ", + "8 55 66 77 "); + } + + @Test + void testSortPartitionOfTupleElementsDescending() throws Exception { + expectInAnyOrder( + sortPartitionOfTupleElementsInOrder(Order.DESCENDING), + "7 3 1 0 ", + "100 79 1 0 ", + "77 66 55 8 "); + } + + @Test + void testSortPartitionOfPojoElementsAscending() throws Exception { + expectInAnyOrder( + sortPartitionOfPojoElementsInOrder(Order.ASCENDING), + "0 1 3 7 ", + "0 1 79 100 ", + "8 55 66 77 "); + } + + @Test + void testSortPartitionOfPojoElementsDescending() throws Exception { + expectInAnyOrder( + sortPartitionOfPojoElementsInOrder(Order.DESCENDING), + "7 3 1 0 ", + "100 79 1 0 ", + "77 66 55 8 "); + } + + @Test + public void testSortPartitionByKeySelectorAscending() throws Exception { + expectInAnyOrder( + sortPartitionByKeySelectorInOrder(Order.ASCENDING), + "0 1 3 7 ", + "0 1 79 100 ", + "8 55 66 77 "); + } + + @Test + void testSortPartitionByKeySelectorDescending() throws Exception { + expectInAnyOrder( + sortPartitionByKeySelectorInOrder(Order.DESCENDING), + "7 3 1 0 ", + "100 79 1 0 ", + "77 66 55 8 "); + } + + private CloseableIterator<String> sortPartitionOfTupleElementsInOrder(Order order) + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Tuple2<String, Integer>> source = + env.fromData( + Tuple2.of("key1", 0), + Tuple2.of("key1", 7), + Tuple2.of("key1", 3), + Tuple2.of("key1", 1), + Tuple2.of("key2", 1), + Tuple2.of("key2", 100), + Tuple2.of("key2", 0), + Tuple2.of("key2", 79), + Tuple2.of("key3", 77), + Tuple2.of("key3", 66), + Tuple2.of("key3", 55), + Tuple2.of("key3", 8)); + return source.map( + new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> map(Tuple2<String, Integer> value) + throws Exception { + return value; + } + }) + .setParallelism(2) + .keyBy( + new KeySelector<Tuple2<String, Integer>, String>() { + @Override + public String getKey(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + }) + .fullWindowPartition() + .sortPartition(1, order) + .fullWindowPartition() + .mapPartition( + new MapPartitionFunction<Tuple2<String, Integer>, String>() { + @Override + public void mapPartition( + Iterable<Tuple2<String, Integer>> values, + Collector<String> out) { + StringBuilder sb = new StringBuilder(); + String preKey = null; + for (Tuple2<String, Integer> value : values) { + if (preKey != null && !preKey.equals(value.f0)) { + out.collect(sb.toString()); + sb = new StringBuilder(); + } + sb.append(value.f1); + sb.append(" "); + preKey = value.f0; + } + out.collect(sb.toString()); + } + }) + .executeAndCollect(); + } + + private CloseableIterator<String> sortPartitionOfPojoElementsInOrder(Order order) + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<TestPojo> source = + env.fromData( + new TestPojo("key1", 0), + new TestPojo("key1", 7), + new TestPojo("key1", 3), + new TestPojo("key1", 1), + new TestPojo("key2", 1), + new TestPojo("key2", 100), + new TestPojo("key2", 0), + new TestPojo("key2", 79), + new TestPojo("key3", 77), + new TestPojo("key3", 66), + new TestPojo("key3", 55), + new TestPojo("key3", 8)); + return source.map( + new MapFunction<TestPojo, TestPojo>() { + @Override + public TestPojo map(TestPojo value) throws Exception { + return value; + } + }) + .setParallelism(2) + .keyBy( + new KeySelector<TestPojo, String>() { + @Override + public String getKey(TestPojo value) throws Exception { + return value.getKey(); + } + }) + .fullWindowPartition() + .sortPartition("value", order) + .fullWindowPartition() + .mapPartition( + new MapPartitionFunction<TestPojo, String>() { + @Override + public void mapPartition( + Iterable<TestPojo> values, Collector<String> out) { + StringBuilder sb = new StringBuilder(); + String preKey = null; + for (TestPojo value : values) { + if (preKey != null && !preKey.equals(value.getKey())) { + out.collect(sb.toString()); + sb = new StringBuilder(); + } + sb.append(value.getValue()); + sb.append(" "); + preKey = value.getKey(); + } + out.collect(sb.toString()); + } + }) + .executeAndCollect(); + } + + private CloseableIterator<String> sortPartitionByKeySelectorInOrder(Order order) + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<TestPojo> source = + env.fromData( + new TestPojo("key1", 0), + new TestPojo("key1", 7), + new TestPojo("key1", 3), + new TestPojo("key1", 1), + new TestPojo("key2", 1), + new TestPojo("key2", 100), + new TestPojo("key2", 0), + new TestPojo("key2", 79), + new TestPojo("key3", 77), + new TestPojo("key3", 66), + new TestPojo("key3", 55), + new TestPojo("key3", 8)); + return source.map( + new MapFunction<TestPojo, TestPojo>() { + @Override + public TestPojo map(TestPojo value) throws Exception { + return value; + } + }) + .setParallelism(2) + .keyBy( + new KeySelector<TestPojo, String>() { + @Override + public String getKey(TestPojo value) throws Exception { + return value.getKey(); + } + }) + .fullWindowPartition() + .sortPartition( + new KeySelector<TestPojo, Integer>() { + @Override + public Integer getKey(TestPojo value) throws Exception { + return value.getValue(); + } + }, + order) + .fullWindowPartition() + .mapPartition( + new MapPartitionFunction<TestPojo, String>() { + @Override + public void mapPartition( + Iterable<TestPojo> values, Collector<String> out) { + StringBuilder sb = new StringBuilder(); + String preKey = null; + for (TestPojo value : values) { + if (preKey != null && !preKey.equals(value.getKey())) { + out.collect(sb.toString()); + sb = new StringBuilder(); + } + sb.append(value.getValue()); + sb.append(" "); + preKey = value.getKey(); + } + out.collect(sb.toString()); + } + }) + .executeAndCollect(); + } + private Collection<Tuple2<String, String>> createSource() { List<Tuple2<String, String>> source = new ArrayList<>(); for (int index = 0; index < EVENT_NUMBER; ++index) { @@ -248,4 +483,39 @@ class KeyedPartitionWindowedStreamITCase { return String.valueOf(testField); } } + + /** The test pojo. */ + public static class TestPojo { + + public String key; + + public Integer value; + + public TestPojo() {} + + public TestPojo(Integer value) { + this.value = value; + } + + public TestPojo(String key, Integer value) { + this.key = key; + this.value = value; + } + + public Integer getValue() { + return value; + } + + public void setValue(Integer value) { + this.value = value; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java index b522664c9d3..6ec18d01bfe 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/NonKeyedPartitionWindowedStreamITCase.java @@ -22,6 +22,9 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.NonKeyedPartitionWindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -136,6 +139,159 @@ class NonKeyedPartitionWindowedStreamITCase { expectInAnyOrder(resultIterator, "94", "94"); } + @Test + void testSortPartitionOfTupleElementsAscending() throws Exception { + expectInAnyOrder(sortPartitionOfTupleElementsInOrder(Order.ASCENDING), "013", "013"); + } + + @Test + void testSortPartitionOfTupleElementsDescending() throws Exception { + expectInAnyOrder(sortPartitionOfTupleElementsInOrder(Order.DESCENDING), "310", "310"); + } + + @Test + void testSortPartitionOfPojoElementsAscending() throws Exception { + expectInAnyOrder(sortPartitionOfPojoElementsInOrder(Order.ASCENDING), "013", "013"); + } + + @Test + void testSortPartitionOfPojoElementsDescending() throws Exception { + expectInAnyOrder(sortPartitionOfPojoElementsInOrder(Order.DESCENDING), "310", "310"); + } + + @Test + void testSortPartitionByKeySelectorAscending() throws Exception { + expectInAnyOrder(sortPartitionByKeySelectorInOrder(Order.ASCENDING), "013", "013"); + } + + @Test + void testSortPartitionByKeySelectorDescending() throws Exception { + expectInAnyOrder(sortPartitionByKeySelectorInOrder(Order.DESCENDING), "310", "310"); + } + + private CloseableIterator<String> sortPartitionOfTupleElementsInOrder(Order order) + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Tuple2<String, Integer>> source = + env.fromData( + Tuple2.of("Test", 0), + Tuple2.of("Test", 0), + Tuple2.of("Test", 3), + Tuple2.of("Test", 3), + Tuple2.of("Test", 1), + Tuple2.of("Test", 1)); + return source.rebalance() + .map( + new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> map(Tuple2<String, Integer> value) + throws Exception { + return value; + } + }) + .setParallelism(2) + .fullWindowPartition() + .sortPartition(1, order) + .fullWindowPartition() + .mapPartition( + new MapPartitionFunction<Tuple2<String, Integer>, String>() { + @Override + public void mapPartition( + Iterable<Tuple2<String, Integer>> values, + Collector<String> out) { + StringBuilder sb = new StringBuilder(); + for (Tuple2<String, Integer> value : values) { + sb.append(value.f1); + } + out.collect(sb.toString()); + } + }) + .executeAndCollect(); + } + + private CloseableIterator<String> sortPartitionOfPojoElementsInOrder(Order order) + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<TestPojo> source = + env.fromData( + new TestPojo(0), + new TestPojo(0), + new TestPojo(3), + new TestPojo(3), + new TestPojo(1), + new TestPojo(1)); + return source.rebalance() + .map( + new MapFunction<TestPojo, TestPojo>() { + @Override + public TestPojo map(TestPojo value) throws Exception { + return value; + } + }) + .setParallelism(2) + .fullWindowPartition() + .sortPartition("value", order) + .fullWindowPartition() + .mapPartition( + new MapPartitionFunction<TestPojo, String>() { + @Override + public void mapPartition( + Iterable<TestPojo> values, Collector<String> out) { + StringBuilder sb = new StringBuilder(); + for (TestPojo value : values) { + sb.append(value.getValue()); + } + out.collect(sb.toString()); + } + }) + .executeAndCollect(); + } + + private CloseableIterator<String> sortPartitionByKeySelectorInOrder(Order order) + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<TestPojo> source = + env.fromData( + new TestPojo("KEY", 0), + new TestPojo("KEY", 0), + new TestPojo("KEY", 3), + new TestPojo("KEY", 3), + new TestPojo("KEY", 1), + new TestPojo("KEY", 1)); + return source.rebalance() + .map( + new MapFunction<TestPojo, TestPojo>() { + @Override + public TestPojo map(TestPojo value) throws Exception { + return value; + } + }) + .setParallelism(2) + .fullWindowPartition() + .sortPartition( + new KeySelector<TestPojo, Integer>() { + @Override + public Integer getKey(TestPojo value) throws Exception { + return value.getValue(); + } + }, + order) + .fullWindowPartition() + .mapPartition( + new MapPartitionFunction<TestPojo, String>() { + @Override + public void mapPartition( + Iterable<TestPojo> values, Collector<String> out) { + StringBuilder sb = new StringBuilder(); + for (TestPojo value : values) { + sb.append(value.getValue()); + } + out.collect(sb.toString()); + } + }) + .executeAndCollect(); + } + private void expectInAnyOrder(CloseableIterator<String> resultIterator, String... expected) { List<String> listExpected = Lists.newArrayList(expected); List<String> testResults = Lists.newArrayList(resultIterator); @@ -172,4 +328,39 @@ class NonKeyedPartitionWindowedStreamITCase { return String.valueOf(testField); } } + + /** The test pojo. */ + public static class TestPojo { + + public String key; + + public Integer value; + + public TestPojo() {} + + public TestPojo(Integer value) { + this.value = value; + } + + public TestPojo(String key, Integer value) { + this.key = key; + this.value = value; + } + + public Integer getValue() { + return value; + } + + public void setValue(Integer value) { + this.value = value; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + } }
