[ 
https://issues.apache.org/jira/browse/FLINK-22900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17359299#comment-17359299
 ] 

bigdataf edited comment on FLINK-22900 at 6/9/21, 1:35 AM:
-----------------------------------------------------------

[~luoyuxia] 

Hi,It means, in the source code, isHiddenFile is as follows:

 
{code:java}
public static List<Tuple2<LinkedHashMap<String, String>, Path>> 
searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
    FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, 
fs);
    List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
    FileStatus[] var5 = generatedParts;
    int var6 = generatedParts.length;

    for (int var7 = 0; var7 < var6; ++var7) {
        FileStatus part = var5[var7];
        if (!isHiddenFile(part)) {
            ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), 
part.getPath()));
        }
    }
    return ret;
}

private static boolean isHiddenFile(FileStatus fileStatus) {
    String name = fileStatus.getPath().getName();
    return name.startsWith("_") || name.startsWith(".");
}
public static LinkedHashMap<String, String> extractPartitionSpecFromPath(Path 
currPath) {
   LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
   List<String[]> kvs = new ArrayList<>();
   do {
      String component = currPath.getName();
      Matcher m = PARTITION_NAME_PATTERN.matcher(component);
      if (m.matches()) {
         String k = unescapePathName(m.group(1));
         String v = unescapePathName(m.group(2));
         String[] kv = new String[2];
         kv[0] = k;
         kv[1] = v;
         kvs.add(kv);
      }
      currPath = currPath.getParent();
   } while (currPath != null && !currPath.getName().isEmpty());

   // reverse the list since we checked the part from leaf dir to table's base 
dir
   for (int i = kvs.size(); i > 0; i--) {
      fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
   }

   return fullPartSpec;
}
{code}
When filtering partitions, it is only filtered according to the last file name, 
and does not consider whether the number of partitions matches the path. The 
number of partition paths matched by extractPartitionSpecFromPath after the 
loop traversal does not match the actual path, and it is not considered at this 
time. An abnormal partition path processing resulted in subsequent errors。

in the following code

 

!image-2021-06-08-19-55-59-174.png!

 
  


was (Author: bigdataf):
[~luoyuxia] 

Hi,It means, in the source code, isHiddenFile is as follows:

 
{code:java}
public static List<Tuple2<LinkedHashMap<String, String>, Path>> 
searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
    FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, 
fs);
    List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
    FileStatus[] var5 = generatedParts;
    int var6 = generatedParts.length;

    for (int var7 = 0; var7 < var6; ++var7) {
        FileStatus part = var5[var7];
        if (!isHiddenFile(part)) {
            ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), 
part.getPath()));
        }
    }
    return ret;
}

private static boolean isHiddenFile(FileStatus fileStatus) {
    String name = fileStatus.getPath().getName();
    return name.startsWith("_") || name.startsWith(".");
}
public static LinkedHashMap<String, String> extractPartitionSpecFromPath(Path 
currPath) {
   LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
   List<String[]> kvs = new ArrayList<>();
   do {
      String component = currPath.getName();
      Matcher m = PARTITION_NAME_PATTERN.matcher(component);
      if (m.matches()) {
         String k = unescapePathName(m.group(1));
         String v = unescapePathName(m.group(2));
         String[] kv = new String[2];
         kv[0] = k;
         kv[1] = v;
         kvs.add(kv);
      }
      currPath = currPath.getParent();
   } while (currPath != null && !currPath.getName().isEmpty());

   // reverse the list since we checked the part from leaf dir to table's base 
dir
   for (int i = kvs.size(); i > 0; i--) {
      fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
   }

   return fullPartSpec;
}
{code}
It means, in the source code, isHidden File is as follows: When filtering 
partitions, it is only filtered according to the last file name, and does not 
consider whether the number of partitions matches the path. The number of 
partition paths matched by extractPartitionSpecFromPath after the loop 
traversal does not match the actual path, and it is not considered at this 
time. An abnormal partition path processing resulted in subsequent errors。

in the following code

 

!image-2021-06-08-19-55-59-174.png!

 
 

>  flink 1.11.2 fileSystem source table read fileSystem sink table path 
> multi-partition error
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22900
>                 URL: https://issues.apache.org/jira/browse/FLINK-22900
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Ecosystem, Table SQL / Runtime
>    Affects Versions: 1.11.2
>         Environment: 1.The error code is in FileSystemTableSource
> {code:java}
> public List<Map<String, String>> getPartitions() {
>     try {
>         return (List) 
> PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(), 
> this.path, this.partitionKeys.size()).stream().map((tuple2) -> {
>             return (LinkedHashMap) tuple2.f0;
>         }).map((spec) -> {
>             LinkedHashMap<String, String> ret = new LinkedHashMap();
>             spec.forEach((k, v) ->
>             {
>                 String var10000 = (String) ret.put(k, 
> this.defaultPartName.equals(v) ? null : v);
>             });
>             return ret;
>         }).collect(Collectors.toList());
>     } catch (Exception var2) {
>         throw new TableException("Fetch partitions fail.", var2);
>     }
> }
>  
> {code}
>  
>   2.searchPartSpecAndPaths
>   
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>> 
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
>     FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, 
> fs);
>     //eg: generatedParts 
>     // hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0
>     // hdfs://xxx-hdfs/merge/all/dayno=20210531/hour=11
>     List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
>     FileStatus[] var5 = generatedParts;
>     int var6 = generatedParts.length;
>     for (int var7 = 0; var7 < var6; ++var7) {
>         FileStatus part = var5[var7];
>         if (!isHiddenFile(part)) {
>             ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), 
> part.getPath()));
>         }
>     }
>     return ret;
> }
> {code}
>  
> 3.isHiddenFile reads staging_1622167234684/cp-0 and then an error is 
> reported,so I suggest to judge the number of partitions at the same time to 
> ensure the availability of the directory
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>> 
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) 
> {//根据分去字段个数递归获得分区目录
>     FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, 
> fs);
>     List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
>     for (FileStatus part : generatedParts) {
>         if (isHiddenFile(part)) {
>             continue;
>         }
>         LinkedHashMap<String, String>,Path > fullPartSpec = 
> extractPartitionSpecFromPath(part.getPath());
>         if (fullPartSpec.size == partitionNumber) {
>             ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), 
> part.getPath()));
>         }
>     }
>     return ret;
> }
> {code}
>            Reporter: bigdataf
>            Priority: Major
>         Attachments: image-2021-06-08-19-55-59-174.png
>
>
> eg:
>  Create create table source_test(id string,name string dayno sring,`hour` 
> string) partitioned (dayno ,`hour`) 
> with('connector'='filesystm',path='xxxxx/data/') based on flink filesystem 
> connect
>  1.Stack error
> {code:java}
> // 
> ava.lang.reflect.InvocationTargetException at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:497) at 
> com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66) 
> Caused by: java.util.NoSuchElementException: key not found: hour at 
> scala.collection.MapLike$class.default(MapLike.scala:228) at 
> scala.collection.AbstractMap.default(Map.scala:59) at 
> scala.collection.MapLike$class.apply(MapLike.scala:141) at 
> scala.collection.AbstractMap.apply(Map.scala:59) at 
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
>  at 
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
>  at 
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
>  at 
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129)
>  at 
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134)
>  at 
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144)
>  at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
>  at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>  at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
>  at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
>  at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>  at 
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
>  at 
> com.oppo.recdata.datapipe.flink.table.FlinkTableExecution.start(FlinkTableExecution.java:52)
>  at com.oppo.recdata.datapipe.Datapipe.entryPoint(Datapipe.java:110) at 
> com.oppo.recdata.datapipe.Datapipe.run(Datapipe.java:48) at 
> com.oppo.recdata.datapipe.DatapipeFlink.main(DatapipeFlink.java:13) ... 5 
> more{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to