This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e66279c1b683dabffe233b37066c42cd21450c59
Author: godfreyhe <[email protected]>
AuthorDate: Fri Jun 12 14:14:49 2020 +0800

    [FLINK-18265][fs-connector] Hidden files should be ignored when the 
filesystem table searches for partitions
    
    This closes #12628
---
 .../apache/flink/table/utils/PartitionPathUtils.java   | 14 ++++++++++----
 .../table/planner/runtime/FileSystemITCaseBase.scala   | 18 ++++++++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java
index 34e7c48..1ed4494 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PartitionPathUtils.java
@@ -195,10 +195,7 @@ public class PartitionPathUtils {
                if (statuses == null) {
                        return null;
                }
-               return Arrays.stream(statuses).filter(fileStatus -> {
-                       String name = fileStatus.getPath().getName();
-                       return !name.startsWith("_") && !name.startsWith(".");
-               }).toArray(FileStatus[]::new);
+               return Arrays.stream(statuses).filter(fileStatus -> 
!isHiddenFile(fileStatus)).toArray(FileStatus[]::new);
        }
 
        /**
@@ -213,6 +210,10 @@ public class PartitionPathUtils {
                FileStatus[] generatedParts = getFileStatusRecurse(path, 
partitionNumber, fs);
                List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new 
ArrayList<>();
                for (FileStatus part : generatedParts) {
+                       // ignore hidden file
+                       if (isHiddenFile(part)) {
+                               continue;
+                       }
                        ret.add(new 
Tuple2<>(extractPartitionSpecFromPath(part.getPath()), part.getPath()));
                }
                return ret;
@@ -322,4 +323,9 @@ public class PartitionPathUtils {
                        }
                }
        }
+
+       private static boolean isHiddenFile(FileStatus fileStatus) {
+               String name = fileStatus.getPath().getName();
+               return name.startsWith("_") || name.startsWith(".");
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index 12981e9..28af0e9 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime
 
 import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.Path
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase._
@@ -28,9 +29,12 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import 
org.apache.flink.table.planner.runtime.utils.TableEnvUtil.execInsertSqlAndWaitResult
 import org.apache.flink.types.Row
 
+import org.junit.Assert.assertTrue
 import org.junit.rules.TemporaryFolder
 import org.junit.{Rule, Test}
 
+import java.io.File
+
 import scala.collection.{JavaConverters, Seq}
 
 /**
@@ -188,6 +192,20 @@ trait FileSystemITCaseBase {
   }
 
   @Test
+  def testPartitionWithHiddenFile(): Unit = {
+    execInsertSqlAndWaitResult(tableEnv, "insert into partitionedTable " +
+      "partition(a='1', b='1') select x, y from originalT where a=1 and b=1")
+
+    // create hidden partition dir
+    assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir())
+
+    check(
+      "select x, y from partitionedTable",
+      data_partition_1_1
+    )
+  }
+
+  @Test
   def testNonPartition(): Unit = {
     execInsertSqlAndWaitResult(tableEnv, "insert into nonPartitionedTable " +
         "select x, y, a, b from originalT where a=1 and b=1")

Reply via email to