[
https://issues.apache.org/jira/browse/HIVE-17225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110187#comment-16110187
]
Sahil Takiar commented on HIVE-17225:
-------------------------------------
Here is another example of when this can happen. Say there are three tables:
pt1, pt2 and r1. pt1 and pt2 are partitioned and r1 is not. In terms of data
size pt1 < r1 < pt2. If map-joins are enabled, and all three tables are joined
the following scenario may occur. pt1 is scanned and written to a hash table,
r1 is scanned and written to a hash table. pt2 is treated as the big table in
the map-join. The hashtables for pt1 and r1 are generated in the same Spark
job. If DPP is enabled, then the scan for r1 will result in a pruning sink
targeting the scan for pt1. This will cause the FNF exception show above,
because the scans for r1 and pt1 run in parallel.
> HoS DPP pruning sink ops can target parallel work objects
> ---------------------------------------------------------
>
> Key: HIVE-17225
> URL: https://issues.apache.org/jira/browse/HIVE-17225
> Project: Hive
> Issue Type: Sub-task
> Components: Spark
> Affects Versions: 3.0.0
> Reporter: Sahil Takiar
> Assignee: Sahil Takiar
>
> Setup:
> {code:sql}
> SET hive.spark.dynamic.partition.pruning=true;
> SET hive.strict.checks.cartesian.product=false;
> SET hive.auto.convert.join=true;
> CREATE TABLE partitioned_table1 (col int) PARTITIONED BY (part_col int);
> CREATE TABLE regular_table1 (col1 int, col2 int);
> CREATE TABLE regular_table2 (col1 int, col2 int);
> ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 1);
> ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 2);
> ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 3);
> INSERT INTO table regular_table1 VALUES (0, 0), (1, 1), (2, 2);
> INSERT INTO table regular_table2 VALUES (0, 0), (1, 1), (2, 2);
> INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 1) VALUES (1),
> (2), (3);
> INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 2) VALUES (1),
> (2), (3);
> INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 3) VALUES (1),
> (2), (3);
> SELECT *
> FROM regular_table1,
> regular_table2,
> partitioned_table1
> WHERE partitioned_table1.part_col IN (SELECT regular_table1.col2
> FROM regular_table1
> WHERE regular_table1.col1 > 0)
> AND partitioned_table1.part_col IN (SELECT regular_table2.col2
> FROM regular_table2
> WHERE regular_table2.col1 > 1);
> {code}
> Exception:
> {code}
> 2017-08-01T13:27:47,483 ERROR [b0d354a8-4cdb-4ba9-acec-27d14926aaf4 main]
> ql.Driver: FAILED: Execution Error, return code 3 from
> org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.lang.RuntimeException:
> org.apache.hadoop.hive.ql.metadata.HiveException:
> java.io.FileNotFoundException: File
> file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5
> does not exist
> at
> org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:408)
> at
> org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:498)
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
> at
> org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:127)
> at
> org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:125)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at
> org.apache.spark.rdd.AsyncRDDActions.foreachAsync(AsyncRDDActions.scala:125)
> at
> org.apache.spark.api.java.JavaRDDLike$class.foreachAsync(JavaRDDLike.scala:731)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachAsync(JavaRDDLike.scala:45)
> at
> org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:351)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358)
> at
> org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
> java.io.FileNotFoundException: File
> file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5
> does not exist
> at
> org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:147)
> at
> org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.prune(SparkDynamicPartitionPruner.java:76)
> at
> org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:406)
> ... 62 more
> Caused by: java.io.FileNotFoundException: File
> file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5
> does not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674)
> at
> org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:119)
> ... 64 more
> {code}
> The explain plan for the query is:
> {code}
> STAGE DEPENDENCIES:
> Stage-2 is a root stage
> Stage-1 depends on stages: Stage-2
> Stage-0 depends on stages: Stage-1
> STAGE PLANS:
> Stage: Stage-2
> Spark
> Edges:
> Reducer 5 <- Map 4 (GROUP, 2)
> DagName: stakiar_20170801191847_f8b1a6fa-a4b6-47b7-977e-95ffb63265f4:39
> Vertices:
> Map 2
> Map Operator Tree:
> TableScan
> alias: regular_table2
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Select Operator
> expressions: col1 (type: int), col2 (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 3 Data size: 9 Basic stats:
> COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0
> 1
> 2
> Local Work:
> Map Reduce Local Work
> Map 3
> Map Operator Tree:
> TableScan
> alias: partitioned_table1
> Statistics: Num rows: 9 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Select Operator
> expressions: col (type: int), part_col (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 9 Data size: 9 Basic stats:
> COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0
> 1
> 2
> Local Work:
> Map Reduce Local Work
> Map 4
> Map Operator Tree:
> TableScan
> alias: regular_table1
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Filter Operator
> predicate: ((col1 > 0) and col2 is not null) (type:
> boolean)
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: col2 (type: int)
> outputColumnNames: col2
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Group By Operator
> keys: col2 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Reduce Output Operator
> key expressions: _col0 (type: int)
> sort order: +
> Map-reduce partition columns: _col0 (type: int)
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Map 6
> Map Operator Tree:
> TableScan
> alias: regular_table2
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Filter Operator
> predicate: ((col1 > 1) and col2 is not null) (type:
> boolean)
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: col2 (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Group By Operator
> keys: _col0 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0 _col5 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> Select Operator
> expressions: _col0 (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Group By Operator
> keys: _col0 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Spark Partition Pruning Sink Operator
> partition key expr: part_col
> Statistics: Num rows: 1 Data size: 3 Basic
> stats: COMPLETE Column stats: NONE
> target column name: part_col
> target work: Map 3
> Local Work:
> Map Reduce Local Work
> Reducer 5
> Local Work:
> Map Reduce Local Work
> Reduce Operator Tree:
> Group By Operator
> keys: KEY._col0 (type: int)
> mode: mergepartial
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE
> Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0 _col5 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> Select Operator
> expressions: _col0 (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE
> Column stats: NONE
> Group By Operator
> keys: _col0 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Spark Partition Pruning Sink Operator
> partition key expr: part_col
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> target column name: part_col
> target work: Map 3
> Stage: Stage-1
> Spark
> DagName: stakiar_20170801191847_f8b1a6fa-a4b6-47b7-977e-95ffb63265f4:38
> Vertices:
> Map 1
> Map Operator Tree:
> TableScan
> alias: regular_table1
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Select Operator
> expressions: col1 (type: int), col2 (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 3 Data size: 9 Basic stats:
> COMPLETE Column stats: NONE
> Map Join Operator
> condition map:
> Inner Join 0 to 1
> Inner Join 0 to 2
> keys:
> 0
> 1
> 2
> outputColumnNames: _col0, _col1, _col2, _col3, _col4,
> _col5
> input vertices:
> 1 Map 2
> 2 Map 3
> Statistics: Num rows: 81 Data size: 648 Basic stats:
> COMPLETE Column stats: NONE
> Map Join Operator
> condition map:
> Inner Join 0 to 1
> Left Semi Join 0 to 2
> keys:
> 0 _col5 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> outputColumnNames: _col0, _col1, _col2, _col3, _col4,
> _col5
> input vertices:
> 1 Reducer 5
> 2 Map 6
> Statistics: Num rows: 178 Data size: 1425 Basic
> stats: COMPLETE Column stats: NONE
> File Output Operator
> compressed: false
> Statistics: Num rows: 178 Data size: 1425 Basic
> stats: COMPLETE Column stats: NONE
> table:
> input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> serde:
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> Local Work:
> Map Reduce Local Work
> Stage: Stage-0
> Fetch Operator
> limit: -1
> Processor Tree:
> ListSink
> {code}
> The explain plan with DPP disabled is:
> {code}
> STAGE DEPENDENCIES:
> Stage-2 is a root stage
> Stage-1 depends on stages: Stage-2
> Stage-0 depends on stages: Stage-1
> STAGE PLANS:
> Stage: Stage-2
> Spark
> Edges:
> Reducer 5 <- Map 4 (GROUP, 2)
> #### A masked pattern was here ####
> Vertices:
> Map 2
> Map Operator Tree:
> TableScan
> alias: regular_table2
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Select Operator
> expressions: col1 (type: int), col2 (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 3 Data size: 9 Basic stats:
> COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0
> 1
> 2
> Local Work:
> Map Reduce Local Work
> Map 3
> Map Operator Tree:
> TableScan
> alias: partitioned_table1
> Statistics: Num rows: 9 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Select Operator
> expressions: col (type: int), part_col (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 9 Data size: 9 Basic stats:
> COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0
> 1
> 2
> Local Work:
> Map Reduce Local Work
> Map 4
> Map Operator Tree:
> TableScan
> alias: regular_table1
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Filter Operator
> predicate: ((col1 > 0) and col2 is not null) (type:
> boolean)
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: col2 (type: int)
> outputColumnNames: col2
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Group By Operator
> keys: col2 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Reduce Output Operator
> key expressions: _col0 (type: int)
> sort order: +
> Map-reduce partition columns: _col0 (type: int)
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Map 6
> Map Operator Tree:
> TableScan
> alias: regular_table2
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Filter Operator
> predicate: ((col1 > 1) and col2 is not null) (type:
> boolean)
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: col2 (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Group By Operator
> keys: _col0 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats:
> COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0 _col5 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> Local Work:
> Map Reduce Local Work
> Reducer 5
> Local Work:
> Map Reduce Local Work
> Reduce Operator Tree:
> Group By Operator
> keys: KEY._col0 (type: int)
> mode: mergepartial
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE
> Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0 _col5 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> Stage: Stage-1
> Spark
> #### A masked pattern was here ####
> Vertices:
> Map 1
> Map Operator Tree:
> TableScan
> alias: regular_table1
> Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE
> Column stats: NONE
> Select Operator
> expressions: col1 (type: int), col2 (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 3 Data size: 9 Basic stats:
> COMPLETE Column stats: NONE
> Map Join Operator
> condition map:
> Inner Join 0 to 1
> Inner Join 0 to 2
> keys:
> 0
> 1
> 2
> outputColumnNames: _col0, _col1, _col2, _col3, _col4,
> _col5
> input vertices:
> 1 Map 2
> 2 Map 3
> Statistics: Num rows: 81 Data size: 648 Basic stats:
> COMPLETE Column stats: NONE
> Map Join Operator
> condition map:
> Inner Join 0 to 1
> Left Semi Join 0 to 2
> keys:
> 0 _col5 (type: int)
> 1 _col0 (type: int)
> 2 _col0 (type: int)
> outputColumnNames: _col0, _col1, _col2, _col3, _col4,
> _col5
> input vertices:
> 1 Reducer 5
> 2 Map 6
> Statistics: Num rows: 178 Data size: 1425 Basic
> stats: COMPLETE Column stats: NONE
> File Output Operator
> compressed: false
> Statistics: Num rows: 178 Data size: 1425 Basic
> stats: COMPLETE Column stats: NONE
> table:
> input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> serde:
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> Local Work:
> Map Reduce Local Work
> Stage: Stage-0
> Fetch Operator
> limit: -1
> Processor Tree:
> ListSink
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)