[
https://issues.apache.org/jira/browse/SPARK-29906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974836#comment-16974836
]
koert kuipers commented on SPARK-29906:
---------------------------------------
i added a bit of debug logging:
{code:java}
$ git diff
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 375cec5971..7e5b7fb235 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -86,7 +86,7 @@ object CSVDataSource extends Logging {
}
}
-object TextInputCSVDataSource extends CSVDataSource {
+object TextInputCSVDataSource extends CSVDataSource with Logging {
override val isSplitable: Boolean = true
override def readFile(
@@ -110,9 +110,13 @@ object TextInputCSVDataSource extends CSVDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType = {
+ logInfo(s"!!!!!!!!!! inputPaths ${inputPaths}")
val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions)
val maybeFirstLine = CSVUtils.filterCommentAndEmpty(csv,
parsedOptions).take(1).headOption
- inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions)
+ logInfo(s"!!!!!!!!!! maybeFirstLine ${maybeFirstLine}")
+ val schema = inferFromDataset(sparkSession, csv, maybeFirstLine,
parsedOptions)
+ logInfo(s"!!!!!!!!!! schema ${schema}")
+ schema
}
{code}
and this shows when spark.sql.adaptive.enabled=true:
{code:java}
19/11/15 05:52:06 INFO csv.TextInputCSVDataSource: !!!!!!!!!! inputPaths
List(LocatedFileStatus{path=hdfs://ip-xx-xxx-x-xxx.ec2.internal:8020/user/hadoop/OP_DTL_GNRL_PGYR2013_P06282019.csv;
isDirectory=false; length=2242114396; replication=3; blocksize=134217728;
modification_time=1573794115499; access_time=1573794109887; owner=hadoop;
group=hadoop; permission=rw-r--r--; isSymlink=false})
19/11/15 05:52:10 INFO csv.TextInputCSVDataSource: !!!!!!!!!! maybeFirstLine
Some("UNCHANGED","Covered Recipient
Physician",,,,"195068","SCOTT","KEVIN","FORMAN",,"360 SAN MIGUEL DR","SUITE
701","NEWPORT BEACH","CA","92660-7853","United States",,,"Medical
Doctor","Allopathic & Osteopathic Physicians|Orthopaedic
Surgery","CA",,,,,"Wright Medical Technology, Inc.","100000011065","Wright
Medical Technology, Inc.","TN","United States",12.50,"08/20/2013","1","In-kind
items and services","Food and Beverage",,,,"No","No Third Party
Payment",,"No",,,"No","105165962","No","Covered","Foot and
Ankle",,,,,,,,,,,,,,,"2013","06/28/2019")
19/11/15 05:52:10 INFO csv.TextInputCSVDataSource: !!!!!!!!!! schema
StructType(StructField(UNCHANGED,StringType,true), StructField(Covered
Recipient Physician,StringType,true), StructField(_c2,StringType,true),
StructField(_c3,StringType,true), StructField(_c4,StringType,true),
StructField(195068,StringType,true), StructField(SCOTT,StringType,true),
StructField(KEVIN,StringType,true), StructField(FORMAN,StringType,true),
StructField(_c9,StringType,true), StructField(360 SAN MIGUEL
DR,StringType,true), StructField(SUITE 701,StringType,true),
StructField(NEWPORT BEACH,StringType,true), StructField(CA13,StringType,true),
StructField(92660-7853,StringType,true), StructField(United
States15,StringType,true), StructField(_c16,StringType,true),
StructField(_c17,StringType,true), StructField(Medical Doctor,StringType,true),
StructField(Allopathic & Osteopathic Physicians|Orthopaedic
Surgery,StringType,true), StructField(CA20,StringType,true),
StructField(_c21,StringType,true), StructField(_c22,StringType,true),
StructField(_c23,StringType,true), StructField(_c24,StringType,true),
StructField(Wright Medical Technology, Inc.25,StringType,true),
StructField(100000011065,StringType,true), StructField(Wright Medical
Technology, Inc.27,StringType,true), StructField(TN,StringType,true),
StructField(United States29,StringType,true),
StructField(12.50,StringType,true), StructField(08/20/2013,StringType,true),
StructField(1,StringType,true), StructField(In-kind items and
services,StringType,true), StructField(Food and Beverage,StringType,true),
StructField(_c35,StringType,true), StructField(_c36,StringType,true),
StructField(_c37,StringType,true), StructField(No38,StringType,true),
StructField(No Third Party Payment,StringType,true),
StructField(_c40,StringType,true), StructField(No41,StringType,true),
StructField(_c42,StringType,true), StructField(_c43,StringType,true),
StructField(No44,StringType,true), StructField(105165962,StringType,true),
StructField(No46,StringType,true), StructField(Covered,StringType,true),
StructField(Foot and Ankle,StringType,true), StructField(_c49,StringType,true),
StructField(_c50,StringType,true), StructField(_c51,StringType,true),
StructField(_c52,StringType,true), StructField(_c53,StringType,true),
StructField(_c54,StringType,true), StructField(_c55,StringType,true),
StructField(_c56,StringType,true), StructField(_c57,StringType,true),
StructField(_c58,StringType,true), StructField(_c59,StringType,true),
StructField(_c60,StringType,true), StructField(_c61,StringType,true),
StructField(_c62,StringType,true), StructField(2013,StringType,true),
StructField(06/28/2019,StringType,true))
{code}
so seems to me creating the Dataset[String] and taking first element to get
header is not working somehow. this is in:
{code:java}
val maybeFirstLine = CSVUtils.filterCommentAndEmpty(csv,
parsedOptions).take(1).headOption
{code}
comparing the logs i see when adaptive execution is enabled:
{code:java}
INFO scheduler.DAGScheduler: Final stage: ResultStage 5 (load at <console>:24)
19/11/15 05:52:06 INFO scheduler.DAGScheduler: Parents of final stage:
List(ShuffleMapStage 4)
19/11/15 05:52:06 INFO scheduler.DAGScheduler: Missing parents:
List(ShuffleMapStage 4)
19/11/15 05:52:06 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 4
(MapPartitionsRDD[26] at load at <console>:24), which has no missing parents
19/11/15 05:52:06
{code}
and when adaptive execution is disabled:
{code:java}
INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (load at <console>:24)
19/11/15 06:04:51 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/11/15 06:04:51 INFO scheduler.DAGScheduler: Missing parents: List()
19/11/15 06:04:51 INFO scheduler.DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[3] at load at <console>:24), which has no missing parents
19/11/15 06:04:51
{code}
could adaptive execution somehow be inserting a shuffle that messes up getting
the first line (the header) of the dataset?
> Reading of csv file fails with adaptive execution turned on
> -----------------------------------------------------------
>
> Key: SPARK-29906
> URL: https://issues.apache.org/jira/browse/SPARK-29906
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Environment: build from master today nov 14
> commit fca0a6c394990b86304a8f9a64bf4c7ec58abbd6 (HEAD -> master,
> upstream/master, upstream/HEAD)
> Author: Kevin Yu <[email protected]>
> Date: Thu Nov 14 14:58:32 2019 -0600
> build using:
> $ dev/make-distribution.sh --tgz -Phadoop-2.7 -Dhadoop.version=2.7.4 -Pyarn
> deployed on AWS EMR 5.28 with 10 m5.xlarge slaves
> in spark-env.sh:
> HADOOP_CONF_DIR=/etc/hadoop/conf
> in spark-defaults.conf:
> spark.master yarn
> spark.submit.deployMode client
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.hadoop.yarn.timeline-service.enabled false
> spark.driver.extraClassPath /usr/lib/hadoop-lzo/lib/hadoop-lzo.jar
> spark.driver.extraLibraryPath
> /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
> spark.executor.extraClassPath /usr/lib/hadoop-lzo/lib/hadoop-lzo.jar
> spark.executor.extraLibraryPath
> /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
> Reporter: koert kuipers
> Priority: Major
>
> we observed an issue where spark seems to confuse a data line (not the first
> line of the csv file) for the csv header when it creates the schema.
> {code}
> $ wget http://download.cms.gov/openpayments/PGYR13_P062819.ZIP
> $ unzip PGYR13_P062819.ZIP
> $ hadoop fs -put OP_DTL_GNRL_PGYR2013_P06282019.csv
> $ spark-3.0.0-SNAPSHOT-bin-2.7.4/bin/spark-shell --conf
> spark.sql.adaptive.enabled=true --num-executors 10
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> 19/11/15 00:26:47 WARN yarn.Client: Neither spark.yarn.jars nor
> spark.yarn.archive is set, falling back to uploading libraries under
> SPARK_HOME.
> Spark context Web UI available at http://ip-xx-xxx-x-xxx.ec2.internal:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1573772077642_0006).
> Spark session available as 'spark'.
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT
> /_/
>
> Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> spark.read.format("csv").option("header",
> true).option("enforceSchema",
> false).load("OP_DTL_GNRL_PGYR2013_P06282019.csv").show(1)
> 19/11/15 00:27:10 WARN util.package: Truncated the string representation of a
> plan since it was too large. This behavior can be adjusted by setting
> 'spark.sql.debug.maxToStringFields'.
> [Stage 2:> (0 + 10) /
> 17]19/11/15 00:27:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 2.0 (TID 35, ip-xx-xxx-x-xxx.ec2.internal, executor 1):
> java.lang.IllegalArgumentException: CSV header does not conform to the schema.
> Header: Change_Type, Covered_Recipient_Type, Teaching_Hospital_CCN,
> Teaching_Hospital_ID, Teaching_Hospital_Name, Physician_Profile_ID,
> Physician_First_Name, Physician_Middle_Name, Physician_Last_Name,
> Physician_Name_Suffix, Recipient_Primary_Business_Street_Address_Line1,
> Recipient_Primary_Business_Street_Address_Line2, Recipient_City,
> Recipient_State, Recipient_Zip_Code, Recipient_Country, Recipient_Province,
> Recipient_Postal_Code, Physician_Primary_Type, Physician_Specialty,
> Physician_License_State_code1, Physician_License_State_code2,
> Physician_License_State_code3, Physician_License_State_code4,
> Physician_License_State_code5,
> Submitting_Applicable_Manufacturer_or_Applicable_GPO_Name,
> Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID,
> Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name,
> Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State,
> Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country,
> Total_Amount_of_Payment_USDollars, Date_of_Payment,
> Number_of_Payments_Included_in_Total_Amount,
> Form_of_Payment_or_Transfer_of_Value, Nature_of_Payment_or_Transfer_of_Value,
> City_of_Travel, State_of_Travel, Country_of_Travel,
> Physician_Ownership_Indicator, Third_Party_Payment_Recipient_Indicator,
> Name_of_Third_Party_Entity_Receiving_Payment_or_Transfer_of_Value,
> Charity_Indicator, Third_Party_Equals_Covered_Recipient_Indicator,
> Contextual_Information, Delay_in_Publication_Indicator, Record_ID,
> Dispute_Status_for_Publication, Product_Indicator,
> Name_of_Associated_Covered_Drug_or_Biological1,
> Name_of_Associated_Covered_Drug_or_Biological2,
> Name_of_Associated_Covered_Drug_or_Biological3,
> Name_of_Associated_Covered_Drug_or_Biological4,
> Name_of_Associated_Covered_Drug_or_Biological5,
> NDC_of_Associated_Covered_Drug_or_Biological1,
> NDC_of_Associated_Covered_Drug_or_Biological2,
> NDC_of_Associated_Covered_Drug_or_Biological3,
> NDC_of_Associated_Covered_Drug_or_Biological4,
> NDC_of_Associated_Covered_Drug_or_Biological5,
> Name_of_Associated_Covered_Device_or_Medical_Supply1,
> Name_of_Associated_Covered_Device_or_Medical_Supply2,
> Name_of_Associated_Covered_Device_or_Medical_Supply3,
> Name_of_Associated_Covered_Device_or_Medical_Supply4,
> Name_of_Associated_Covered_Device_or_Medical_Supply5, Program_Year,
> Payment_Publication_Date
> Schema: UNCHANGED, Covered Recipient Physician, _c2, _c3, _c4, 278352, JOHN,
> M, RAY, JR, 3625 CAPE CENTER DR, _c11, FAYETTEVILLE, NC13, 28304-4457, United
> States15, _c16, _c17, Medical Doctor, Allopathic & Osteopathic
> Physicians|Family Medicine, NC20, _c21, _c22, _c23, _c24, Par Pharmaceutical,
> Inc.25, 100000010989, Par Pharmaceutical, Inc.27, NY, United States29, 17.29,
> 10/23/2013, 1, In-kind items and services, Food and Beverage, _c35, _c36,
> _c37, No38, No Third Party Payment, _c40, _c41, _c42, _c43, No44, 104522962,
> No46, Covered, MEGACE ES MEGESTROL ACETATE, _c49, _c50, _c51, _c52,
> 4988409496, _c54, _c55, _c56, _c57, _c58, _c59, _c60, _c61, _c62, 2013,
> 06/28/2019
> Expected: UNCHANGED but found: Change_Type
> CSV file:
> hdfs://ip-xx-xxx-x-xxx.ec2.internal:8020/user/hadoop/OP_DTL_GNRL_PGYR2013_P06282019.csv
> at
> org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.$anonfun$checkHeaderColumnNames$2(CSVHeaderChecker.scala:95)
> at
> org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.$anonfun$checkHeaderColumnNames$2$adapted(CSVHeaderChecker.scala:91)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.checkHeaderColumnNames(CSVHeaderChecker.scala:91)
> at
> org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.$anonfun$checkHeaderColumnNames$6(CSVHeaderChecker.scala:127)
> at
> org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.$anonfun$checkHeaderColumnNames$6$adapted(CSVHeaderChecker.scala:126)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.checkHeaderColumnNames(CSVHeaderChecker.scala:126)
> at
> org.apache.spark.sql.catalyst.csv.UnivocityParser$.parseIterator(UnivocityParser.scala:340)
> at
> org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:106)
> at
> org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory.buildReader(CSVPartitionReaderFactory.scala:68)
> at
> org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory.$anonfun$createReader$1(FilePartitionReaderFactory.scala:29)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at
> org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.getNextReader(FilePartitionReader.scala:109)
> at
> org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.next(FilePartitionReader.scala:42)
> at
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:62)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
> at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:266)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> if i instead run:
> {code}
> spark-3.0.0-SNAPSHOT-bin-2.7.4/bin/spark-shell --conf
> spark.sql.adaptive.enabled=false --num-executors 10
> {code}
> everything runs fine.
> note that we first observed the issue on our inhouse cluster, not on EMR, and
> it wasn't with a simple .show command, but with job that was doing
> distributed reading and writing.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]