[
https://issues.apache.org/jira/browse/FLINK-22900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17360087#comment-17360087
]
Rui Li commented on FLINK-22900:
--------------------------------
Hive stores partition locations as part of the metadata, so it doesn't perform
partition discovery when reading a partitioned table. Partition discovery is
needed when loading dynamic partitions in Hive. In that case, Hive will check
whether the extracted partition spec is valid (e.g. should be a full spec and
doesn't contain non-partition columns), and errors out if the spec is invalid.
Note that the partition discovery is done under the staging dir which is
created by Hive itself. So it makes sense to assume there shouldn't be invalid
paths.
For FS table source, maybe we can perform similar check, but just skip invalid
paths, instead of throwing an exception. WDYT?
> flink 1.11.2 fileSystem source table read fileSystem sink table path
> multi-partition error
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-22900
> URL: https://issues.apache.org/jira/browse/FLINK-22900
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Ecosystem, Table SQL / Runtime
> Affects Versions: 1.11.2
> Environment: 1.The error code is in FileSystemTableSource
> {code:java}
> public List<Map<String, String>> getPartitions() {
> try {
> return (List)
> PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(),
> this.path, this.partitionKeys.size()).stream().map((tuple2) -> {
> return (LinkedHashMap) tuple2.f0;
> }).map((spec) -> {
> LinkedHashMap<String, String> ret = new LinkedHashMap();
> spec.forEach((k, v) ->
> {
> String var10000 = (String) ret.put(k,
> this.defaultPartName.equals(v) ? null : v);
> });
> return ret;
> }).collect(Collectors.toList());
> } catch (Exception var2) {
> throw new TableException("Fetch partitions fail.", var2);
> }
> }
>
> {code}
>
> 2.searchPartSpecAndPaths
>
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> //eg: generatedParts
> // hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0
> // hdfs://xxx-hdfs/merge/all/dayno=20210531/hour=11
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> FileStatus[] var5 = generatedParts;
> int var6 = generatedParts.length;
> for (int var7 = 0; var7 < var6; ++var7) {
> FileStatus part = var5[var7];
> if (!isHiddenFile(part)) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
>
> 3.isHiddenFile reads staging_1622167234684/cp-0 and then an error is
> reported,so I suggest to judge the number of partitions at the same time to
> ensure the availability of the directory
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber)
> {//根据分去字段个数递归获得分区目录
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> for (FileStatus part : generatedParts) {
> if (isHiddenFile(part)) {
> continue;
> }
> LinkedHashMap<String, String>,Path > fullPartSpec =
> extractPartitionSpecFromPath(part.getPath());
> if (fullPartSpec.size == partitionNumber) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
> Reporter: bigdataf
> Priority: Major
> Attachments: image-2021-06-08-19-55-59-174.png
>
>
> eg:
> Create create table source_test(id string,name string dayno sring,`hour`
> string) partitioned (dayno ,`hour`)
> with('connector'='filesystm',path='xxxxx/data/') based on flink filesystem
> connect
> 1.Stack error
> {code:java}
> //
> ava.lang.reflect.InvocationTargetException at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497) at
> com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66)
> Caused by: java.util.NoSuchElementException: key not found: hour at
> scala.collection.MapLike$class.default(MapLike.scala:228) at
> scala.collection.AbstractMap.default(Map.scala:59) at
> scala.collection.MapLike$class.apply(MapLike.scala:141) at
> scala.collection.AbstractMap.apply(Map.scala:59) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:381) at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
> at
> com.oppo.recdata.datapipe.flink.table.FlinkTableExecution.start(FlinkTableExecution.java:52)
> at com.oppo.recdata.datapipe.Datapipe.entryPoint(Datapipe.java:110) at
> com.oppo.recdata.datapipe.Datapipe.run(Datapipe.java:48) at
> com.oppo.recdata.datapipe.DatapipeFlink.main(DatapipeFlink.java:13) ... 5
> more{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)