[
https://issues.apache.org/jira/browse/SPARK-13747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16008526#comment-16008526
]
Kagan Turgut edited comment on SPARK-13747 at 5/12/17 6:30 PM:
---------------------------------------------------------------
I am having the same exception.
I am creating a new data source that processes reading batch files
asynchronously into a temp folder and then returns them as a data frame.
Within the buildScan(): RDD[Row] method I have a loop that saves the results
of each batch in a parquet file:
val df = spark.sparkContext.parallelize(batchResult.records, 200).toDF()
df.write.mode(SaveMode.Overwrite).save(tempFile)}
Then once the temp files are all written, buildScan method returns
I will load all those temp files in parallel and return the union in an RDD
like this:
sqlContext.read
.schema(schema)
.load(files: _*)
.queryExecution.executedPlan. execute().asInstanceOf[RDD[Row]]
I can see the concurrency issue as I am trying to write the temp files at same
time I am trying to construct a return RDD.
Is there a better way of doing this?
To work around, I can save the temp files as regular CSV to work around the
issue but I prefer to save these files as Parquet files using Spark API.
Would upgrading to 2.12 fix this issue in my case?
was (Author: kagan):
I am having the same exception.
I am creating a new data source that processes reading batch files
asynchronously into a temp folder and then returns them as a data frame.
Within the buildScan(): RDD[Row] method I have a loop that saves the results
of each batch in a parquet file:
val df = spark.sparkContext.parallelize(batchResult.records, 200).toDF()
df.write.mode(SaveMode.Overwrite).save(tempFile)}
Then once the temp files are all written, buildScan method returns
I will load all those temp files in parallel and return the union in an RDD
like this:
sqlContext.read
.schema(schema)
.load(files: _*)
.queryExecution.executedPlan. execute().asInstanceOf[RDD[Row]]
I can see the concurrency issue as I am trying to write the temp files at same
time I am trying to construct a return RDD.
Is there a better way of doing this?
To work around, I can save the temp files as regular CSV to work around the
issuebut I prefer to save these files as Parquet files using Spark API.
Would upgrading to 2.12 fix this issue in my case?
> Concurrent execution in SQL doesn't work with Scala ForkJoinPool
> ----------------------------------------------------------------
>
> Key: SPARK-13747
> URL: https://issues.apache.org/jira/browse/SPARK-13747
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0, 2.0.1
> Reporter: Shixiong Zhu
> Assignee: Shixiong Zhu
>
> Run the following codes may fail
> {code}
> (1 to 100).par.foreach { _ =>
> println(sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count())
> }
> java.lang.IllegalArgumentException: spark.sql.execution.id is already set
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
>
> at
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
> {code}
> This is because SparkContext.runJob can be suspended when using a
> ForkJoinPool (e.g.,scala.concurrent.ExecutionContext.Implicits.global) as it
> calls Await.ready (introduced by https://github.com/apache/spark/pull/9264).
> So when SparkContext.runJob is suspended, ForkJoinPool will run another task
> in the same thread, however, the local properties has been polluted.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]