[
https://issues.apache.org/jira/browse/FLINK-22900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22900:
-----------------------------------
Labels: stale-assigned (was: )
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issue is assigned but has not
received an update in 14 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a
comment updating the community on your progress. If this issue is waiting on
feedback, please consider this a reminder to the committer/reviewer. Flink is a
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone
else may work on it. If the "warning_label" label is not removed in 7 days, the
issue will be automatically unassigned.
> flink 1.11.2 fileSystem source table read fileSystem sink table path
> multi-partition error
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-22900
> URL: https://issues.apache.org/jira/browse/FLINK-22900
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Ecosystem
> Affects Versions: 1.11.2
> Environment: 1.The error code is in FileSystemTableSource
> {code:java}
> public List<Map<String, String>> getPartitions() {
> try {
> return (List)
> PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(),
> this.path, this.partitionKeys.size()).stream().map((tuple2) -> {
> return (LinkedHashMap) tuple2.f0;
> }).map((spec) -> {
> LinkedHashMap<String, String> ret = new LinkedHashMap();
> spec.forEach((k, v) ->
> {
> String var10000 = (String) ret.put(k,
> this.defaultPartName.equals(v) ? null : v);
> });
> return ret;
> }).collect(Collectors.toList());
> } catch (Exception var2) {
> throw new TableException("Fetch partitions fail.", var2);
> }
> }
>
> {code}
>
> 2.searchPartSpecAndPaths
>
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> //eg: generatedParts
> // hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0
> // hdfs://xxx-hdfs/merge/all/dayno=20210531/hour=11
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> FileStatus[] var5 = generatedParts;
> int var6 = generatedParts.length;
> for (int var7 = 0; var7 < var6; ++var7) {
> FileStatus part = var5[var7];
> if (!isHiddenFile(part)) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
>
> 3.isHiddenFile reads staging_1622167234684/cp-0 and then an error is
> reported,so I suggest to judge the number of partitions at the same time to
> ensure the availability of the directory
> {code:java}
> public static List<Tuple2<LinkedHashMap<String, String>, Path>>
> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber)
> {//根据分去字段个数递归获得分区目录
> FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber,
> fs);
> List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
> for (FileStatus part : generatedParts) {
> if (isHiddenFile(part)) {
> continue;
> }
> LinkedHashMap<String, String>,Path > fullPartSpec =
> extractPartitionSpecFromPath(part.getPath());
> if (fullPartSpec.size == partitionNumber) {
> ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()),
> part.getPath()));
> }
> }
> return ret;
> }
> {code}
> Reporter: bigdataf
> Assignee: bigdataf
> Priority: Major
> Labels: stale-assigned
> Attachments: image-2021-06-08-19-55-59-174.png
>
>
> eg:
> Create create table source_test(id string,name string dayno sring,`hour`
> string) partitioned (dayno ,`hour`)
> with('connector'='filesystm',path='xxxxx/data/') based on flink filesystem
> connect
> 1.Stack error
> {code:java}
> //
> ava.lang.reflect.InvocationTargetException at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497) at
> com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66)
> Caused by: java.util.NoSuchElementException: key not found: hour at
> scala.collection.MapLike$class.default(MapLike.scala:228) at
> scala.collection.AbstractMap.default(Map.scala:59) at
> scala.collection.MapLike$class.apply(MapLike.scala:141) at
> scala.collection.AbstractMap.apply(Map.scala:59) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
> at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134)
> at
> org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:381) at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
> at
> com.oppo.recdata.datapipe.flink.table.FlinkTableExecution.start(FlinkTableExecution.java:52)
> at com.oppo.recdata.datapipe.Datapipe.entryPoint(Datapipe.java:110) at
> com.oppo.recdata.datapipe.Datapipe.run(Datapipe.java:48) at
> com.oppo.recdata.datapipe.DatapipeFlink.main(DatapipeFlink.java:13) ... 5
> more{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)