[
https://issues.apache.org/jira/browse/FLINK-22900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17359299#comment-17359299
]
bigdataf edited comment on FLINK-22900 at 6/9/21, 1:35 AM:
-----------------------------------------------------------
[~luoyuxia]
Hi,It means, in the source code, isHiddenFile is as follows:
{code:java}
public static List<Tuple2<LinkedHashMap<String, String>, Path>>
searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
fs);
List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
FileStatus[] var5 = generatedParts;
int var6 = generatedParts.length;
for (int var7 = 0; var7 < var6; ++var7) {
FileStatus part = var5[var7];
if (!isHiddenFile(part)) {
ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
part.getPath()));
}
}
return ret;
}
private static boolean isHiddenFile(FileStatus fileStatus) {
String name = fileStatus.getPath().getName();
return name.startsWith("_") || name.startsWith(".");
}
public static LinkedHashMap<String, String> extractPartitionSpecFromPath(Path
currPath) {
LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
List<String[]> kvs = new ArrayList<>();
do {
String component = currPath.getName();
Matcher m = PARTITION_NAME_PATTERN.matcher(component);
if (m.matches()) {
String k = unescapePathName(m.group(1));
String v = unescapePathName(m.group(2));
String[] kv = new String[2];
kv[0] = k;
kv[1] = v;
kvs.add(kv);
}
currPath = currPath.getParent();
} while (currPath != null && !currPath.getName().isEmpty());
// reverse the list since we checked the part from leaf dir to table's base
dir
for (int i = kvs.size(); i > 0; i--) {
fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
}
return fullPartSpec;
}
{code}
When filtering partitions, it is only filtered according to the last file name,
and does not consider whether the number of partitions matches the path. The
number of partition paths matched by extractPartitionSpecFromPath after the
loop traversal does not match the actual path, and it is not considered at this
time. An abnormal partition path processing resulted in subsequent errors。
in the following code
!image-2021-06-08-19-55-59-174.png!
was (Author: bigdataf):
[~luoyuxia]
Hi,It means, in the source code, isHiddenFile is as follows:
{code:java}
public static List<Tuple2<LinkedHashMap<String, String>, Path>>
searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
fs);
List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
FileStatus[] var5 = generatedParts;
int var6 = generatedParts.length;
for (int var7 = 0; var7 < var6; ++var7) {
FileStatus part = var5[var7];
if (!isHiddenFile(part)) {
ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
part.getPath()));
}
}
return ret;
}
private static boolean isHiddenFile(FileStatus fileStatus) {
String name = fileStatus.getPath().getName();
return name.startsWith("_") || name.startsWith(".");
}
public static LinkedHashMap<String, String> extractPartitionSpecFromPath(Path
currPath) {
LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
List<String[]> kvs = new ArrayList<>();
do {
String component = currPath.getName();
Matcher m = PARTITION_NAME_PATTERN.matcher(component);
if (m.matches()) {
String k = unescapePathName(m.group(1));
String v = unescapePathName(m.group(2));
String[] kv = new String[2];
kv[0] = k;
kv[1] = v;
kvs.add(kv);
}
currPath = currPath.getParent();
} while (currPath != null && !currPath.getName().isEmpty());
// reverse the list since we checked the part from leaf dir to table's base
dir
for (int i = kvs.size(); i > 0; i--) {
fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
}
return fullPartSpec;
}
{code}
It means, in the source code, isHidden File is as follows: When filtering
partitions, it is only filtered according to the last file name, and does not
consider whether the number of partitions matches the path. The number of
partition paths matched by extractPartitionSpecFromPath after the loop
traversal does not match the actual path, and it is not considered at this
time. An abnormal partition path processing resulted in subsequent errors。
in the following code
!image-2021-06-08-19-55-59-174.png!
> flink 1.11.2 fileSystem source table read fileSystem sink table path
> multi-partition error
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-22900
> URL: https://issues.apache.org/jira/browse/FLINK-22900
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Ecosystem, Table SQL / Runtime
> Affects Versions: 1.11.2
> Environment: 1.The error code is in FileSystemTableSource
> {code:java}
> public List<Map<String, String>> getPartitions() {
> try {
> return (List)
> PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(),
> this.path, this.partitionKeys.size()).stream().map((tuple2) -> {
> return (LinkedHashMap) tuple2.f0;
> }).map((spec) -> {
> LinkedHashMap<String, String> ret = new LinkedHashMap();
> spec.forEach((k, v) ->
> {
> String var10000 = (String) ret.put(k,
> this.defaultPartName.equals(v) ? null : v);
> });
> return ret;
> }).collect(Collectors.toList());
> } catch (Exception var2) {
> throw new TableException("Fetch partitions fail.", var2);
> }
> }
>
> {code}
>
> 2.searchPartSpecAndPaths
>
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> //eg: generatedParts
> // hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0
> // hdfs://xxx-hdfs/merge/all/dayno=20210531/hour=11
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> FileStatus[] var5 = generatedParts;
> int var6 = generatedParts.length;
> for (int var7 = 0; var7 < var6; ++var7) {
> FileStatus part = var5[var7];
> if (!isHiddenFile(part)) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
>
> 3.isHiddenFile reads staging_1622167234684/cp-0 and then an error is
> reported,so I suggest to judge the number of partitions at the same time to
> ensure the availability of the directory
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber)
> {//根据分去字段个数递归获得分区目录
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> for (FileStatus part : generatedParts) {
> if (isHiddenFile(part)) {
> continue;
> }
> LinkedHashMap<String, String>,Path > fullPartSpec =
> extractPartitionSpecFromPath(part.getPath());
> if (fullPartSpec.size == partitionNumber) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
> Reporter: bigdataf
> Priority: Major
> Attachments: image-2021-06-08-19-55-59-174.png
>
>
> eg:
> Create create table source_test(id string,name string dayno sring,`hour`
> string) partitioned (dayno ,`hour`)
> with('connector'='filesystm',path='xxxxx/data/') based on flink filesystem
> connect
> 1.Stack error
> {code:java}
> //
> ava.lang.reflect.InvocationTargetException at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497) at
> com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66)
> Caused by: java.util.NoSuchElementException: key not found: hour at
> scala.collection.MapLike$class.default(MapLike.scala:228) at
> scala.collection.AbstractMap.default(Map.scala:59) at
> scala.collection.MapLike$class.apply(MapLike.scala:141) at
> scala.collection.AbstractMap.apply(Map.scala:59) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:381) at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
> at
> com.oppo.recdata.datapipe.flink.table.FlinkTableExecution.start(FlinkTableExecution.java:52)
> at com.oppo.recdata.datapipe.Datapipe.entryPoint(Datapipe.java:110) at
> com.oppo.recdata.datapipe.Datapipe.run(Datapipe.java:48) at
> com.oppo.recdata.datapipe.DatapipeFlink.main(DatapipeFlink.java:13) ... 5
> more{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)