This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e66279c1b683dabffe233b37066c42cd21450c59 Author: godfreyhe <[email protected]> AuthorDate: Fri Jun 12 14:14:49 2020 +0800 [FLINK-18265][fs-connector] Hidden files should be ignored when the filesystem table searches for partitions This closes #12628 --- .../apache/flink/table/utils/PartitionPathUtils.java | 14 ++++++++++---- .../table/planner/runtime/FileSystemITCaseBase.scala | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java index 34e7c48..1ed4494 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java @@ -195,10 +195,7 @@ public class PartitionPathUtils { if (statuses == null) { return null; } - return Arrays.stream(statuses).filter(fileStatus -> { - String name = fileStatus.getPath().getName(); - return !name.startsWith("_") && !name.startsWith("."); - }).toArray(FileStatus[]::new); + return Arrays.stream(statuses).filter(fileStatus -> !isHiddenFile(fileStatus)).toArray(FileStatus[]::new); } /** @@ -213,6 +210,10 @@ public class PartitionPathUtils { FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, fs); List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList<>(); for (FileStatus part : generatedParts) { + // ignore hidden file + if (isHiddenFile(part)) { + continue; + } ret.add(new Tuple2<>(extractPartitionSpecFromPath(part.getPath()), part.getPath())); } return ret; @@ -322,4 +323,9 @@ public class PartitionPathUtils { } } } + + private static boolean isHiddenFile(FileStatus fileStatus) { + String name = fileStatus.getPath().getName(); + return name.startsWith("_") || name.startsWith("."); + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala index 12981e9..28af0e9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.core.fs.Path import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.runtime.FileSystemITCaseBase._ @@ -28,9 +29,12 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TableEnvUtil.execInsertSqlAndWaitResult import org.apache.flink.types.Row +import org.junit.Assert.assertTrue import org.junit.rules.TemporaryFolder import org.junit.{Rule, Test} +import java.io.File + import scala.collection.{JavaConverters, Seq} /** @@ -188,6 +192,20 @@ trait FileSystemITCaseBase { } @Test + def testPartitionWithHiddenFile(): Unit = { + execInsertSqlAndWaitResult(tableEnv, "insert into partitionedTable " + + "partition(a='1', b='1') select x, y from originalT where a=1 and b=1") + + // create hidden partition dir + assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir()) + + check( + "select x, y from partitionedTable", + data_partition_1_1 + ) + } + + @Test def testNonPartition(): Unit = { execInsertSqlAndWaitResult(tableEnv, "insert into nonPartitionedTable " + "select x, y, a, b from originalT where a=1 and b=1")
