This is an automated email from the ASF dual-hosted git repository. xuyangzhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 860837164c9 [FLINK-36443][table][test] Improve flaky tests related to partition specs (#26335) 860837164c9 is described below commit 860837164c97b3a5a8aa4832edd33f4d20e6f141 Author: Muhammet Orazov <916295+mora...@users.noreply.github.com> AuthorDate: Wed Mar 26 04:38:13 2025 +0100 [FLINK-36443][table][test] Improve flaky tests related to partition specs (#26335) * [FLINK-36443][table][test] Improve flaky partitionable table source tests * Add sorting to partition spec --- .../abilities/source/PartitionPushDownSpec.java | 6 +-- .../flink/table/planner/utils/PartitionUtils.java | 48 +++++++++++++++++++ .../source/PartitionPushDownSpecTest.java | 54 ++++++++++++++++++++++ .../table/planner/utils/testTableSourceSinks.scala | 2 +- 4 files changed, 105 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java index 086975eb62b..29c9f633e47 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.abilities.source; import org.apache.flink.table.api.TableException; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; +import org.apache.flink.table.planner.utils.PartitionUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -30,7 +31,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -74,9 +74,7 @@ public final class PartitionPushDownSpec extends SourceAbilitySpecBase { @Override public String getDigests(SourceAbilityContext context) { - return "partitions=[" - + this.partitions.stream().map(Object::toString).collect(Collectors.joining(", ")) - + "]"; + return "partitions=[" + PartitionUtils.sortPartitionsByKey(this.partitions) + "]"; } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/PartitionUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/PartitionUtils.java new file mode 100644 index 00000000000..0075c36affe --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/PartitionUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Helper functions for partitions. */ +public class PartitionUtils { + + private PartitionUtils() {} + + /** + * Returns partitions sorted by key. + * + * @param partitions list of partition key value pairs + * @return sorted partitions + */ + public static String sortPartitionsByKey(final List<Map<String, String>> partitions) { + return partitions.stream() + .map(PartitionUtils::sortPartitionByKey) + .collect(Collectors.joining(", ")); + } + + private static String sortPartitionByKey(final Map<String, String> partition) { + return partition.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(", ", "{", "}")); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpecTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpecTest.java new file mode 100644 index 00000000000..60a2f3527e6 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpecTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.source; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class PartitionPushDownSpecTest { + + private List<Map<String, String>> partitions; + + @BeforeEach + void beforeEach() { + partitions = + List.of( + Map.of("part2", "A", "part1", "B", "part3", "C"), + Map.of("part1", "C", "part2", "D")); + } + + @Test + void testDigestsEmpty() { + PartitionPushDownSpec spec = new PartitionPushDownSpec(Collections.emptyList()); + assertThat(spec.getDigests(null)).isEqualTo("partitions=[]"); + } + + @Test + void testDigestsSorted() { + PartitionPushDownSpec spec = new PartitionPushDownSpec(partitions); + assertThat(spec.getDigests(null)) + .isEqualTo("partitions=[{part1=B, part2=A, part3=C}, {part1=C, part2=D}]"); + } +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala index 8987d33355a..134890e7c6e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala @@ -329,7 +329,7 @@ class TestPartitionableTableSource( override def explainSource(): String = { if (remainingPartitions != null) { - s"partitions=${remainingPartitions.mkString(", ")}" + s"partitions=${PartitionUtils.sortPartitionsByKey(remainingPartitions)}" } else { "" }