[
https://issues.apache.org/jira/browse/FLINK-22900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22900:
-----------------------------------
Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned
stale-major)
Priority: Minor (was: Major)
This issue was labeled "stale-major" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Major, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> flink 1.11.2 fileSystem source table read fileSystem sink table path
> multi-partition error
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-22900
> URL: https://issues.apache.org/jira/browse/FLINK-22900
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Ecosystem
> Affects Versions: 1.11.2
> Environment: 1.The error code is in FileSystemTableSource
> {code:java}
> public List<Map<String, String>> getPartitions() {
> try {
> return (List)
> PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(),
> this.path, this.partitionKeys.size()).stream().map((tuple2) -> {
> return (LinkedHashMap) tuple2.f0;
> }).map((spec) -> {
> LinkedHashMap<String, String> ret = new LinkedHashMap();
> spec.forEach((k, v) ->
> {
> String var10000 = (String) ret.put(k,
> this.defaultPartName.equals(v) ? null : v);
> });
> return ret;
> }).collect(Collectors.toList());
> } catch (Exception var2) {
> throw new TableException("Fetch partitions fail.", var2);
> }
> }
>
> {code}
>
> 2.searchPartSpecAndPaths
>
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> //eg: generatedParts
> // hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0
> // hdfs://xxx-hdfs/merge/all/dayno=20210531/hour=11
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> FileStatus[] var5 = generatedParts;
> int var6 = generatedParts.length;
> for (int var7 = 0; var7 < var6; ++var7) {
> FileStatus part = var5[var7];
> if (!isHiddenFile(part)) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
>
> 3.isHiddenFile reads staging_1622167234684/cp-0 and then an error is
> reported,so I suggest to judge the number of partitions at the same time to
> ensure the availability of the directory
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber)
> {//根据分去字段个数递归获得分区目录
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> for (FileStatus part : generatedParts) {
> if (isHiddenFile(part)) {
> continue;
> }
> LinkedHashMap<String, String>,Path > fullPartSpec =
> extractPartitionSpecFromPath(part.getPath());
> if (fullPartSpec.size == partitionNumber) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
> Reporter: bigdataf
> Priority: Minor
> Labels: auto-deprioritized-major, auto-unassigned
> Attachments: image-2021-06-08-19-55-59-174.png
>
>
> eg:
> Create create table source_test(id string,name string dayno sring,`hour`
> string) partitioned (dayno ,`hour`)
> with('connector'='filesystm',path='xxxxx/data/') based on flink filesystem
> connect
> 1.Stack error
> {code:java}
> //
> ava.lang.reflect.InvocationTargetException at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497) at
> com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66)
> Caused by: java.util.NoSuchElementException: key not found: hour at
> scala.collection.MapLike$class.default(MapLike.scala:228) at
> scala.collection.AbstractMap.default(Map.scala:59) at
> scala.collection.MapLike$class.apply(MapLike.scala:141) at
> scala.collection.AbstractMap.apply(Map.scala:59) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:381) at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
> at
> com.oppo.recdata.datapipe.flink.table.FlinkTableExecution.start(FlinkTableExecution.java:52)
> at com.oppo.recdata.datapipe.Datapipe.entryPoint(Datapipe.java:110) at
> com.oppo.recdata.datapipe.Datapipe.run(Datapipe.java:48) at
> com.oppo.recdata.datapipe.DatapipeFlink.main(DatapipeFlink.java:13) ... 5
> more{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)