[
https://issues.apache.org/jira/browse/SPARK-32813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vladimir Klyushnikov updated SPARK-32813:
-----------------------------------------
Description:
Reading parquet rdd in non columnar mode (i.e. with list fields) if Spark
session was created in one thread and rdd is being read in another - so
InheritableThreadLocal with active session is not propagated. Code below was
working perfectly in Spark 2.X, but fails in Spark 3
{code:scala}
import java.util.concurrent.Executors
import org.apache.spark.sql.SparkSession
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
object Main {
final case class Data(list: List[Int])
def main(args: Array[String]): Unit = {
val executor1 = Executors.newSingleThreadExecutor()
val executor2 = Executors.newSingleThreadExecutor()
try {
val ds = Await.result(Future {
val session =
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import session.implicits._
session.createDataset(Data(1 :: Nil) ::
Nil).write.parquet("test.parquet")
session.read.parquet("test.parquet").as[Data]
}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
Await.result(Future {
ds.rdd.collect().foreach(println(_))
}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
} finally {
executor1.shutdown()
executor2.shutdown()
}
}
}
{code}
This code fails with following exception:
{noformat}
Exception in thread "main" java.util.NoSuchElementException: None.getException
in thread "main" java.util.NoSuchElementException: None.get at
scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
at
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
at
org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at
org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
{noformat}
was:
Reading parquet rdd in non columnar mode (i.e. with list fields) if Spark
session was created in one thread and rdd is being read in another - so
InheritableThreadLocal with active session is not propagated. Code below was
working perfectly in Spark 2.X, but fails in Spark 3
{code:scala}
import java.util.concurrent.Executors
import org.apache.spark.sql.SparkSession
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
object Main {
final case class Data(list: List[Int])
def main(args: Array[String]): Unit = {
val executor1 = Executors.newSingleThreadExecutor()
val executor2 = Executors.newSingleThreadExecutor()
try {
val ds = Await.result(Future {
val session =
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import session.implicits._
session.createDataset(Data(1 :: Nil) ::
Nil).write.parquet("test.parquet")
session.read.parquet("test.parquet").as[Data]
}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
Await.result(Future {
ds.rdd.collect().foreach(println(_))
}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
} finally {
executor1.shutdown()
executor2.shutdown()
}
}
}
{code}
This code fails with following exception:
{noformat}
Exception in thread "main" java.util.NoSuchElementException: None.getException
in thread "main" java.util.NoSuchElementException: None.get at
scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
at
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
at
org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at
org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196){noformat}
> Reading parquet rdd in non columnar mode fails in multithreaded environment
> ---------------------------------------------------------------------------
>
> Key: SPARK-32813
> URL: https://issues.apache.org/jira/browse/SPARK-32813
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Vladimir Klyushnikov
> Priority: Blocker
>
> Reading parquet rdd in non columnar mode (i.e. with list fields) if Spark
> session was created in one thread and rdd is being read in another - so
> InheritableThreadLocal with active session is not propagated. Code below was
> working perfectly in Spark 2.X, but fails in Spark 3
> {code:scala}
> import java.util.concurrent.Executors
> import org.apache.spark.sql.SparkSession
> import scala.concurrent.{Await, ExecutionContext, Future}
> import scala.concurrent.duration._
> object Main {
> final case class Data(list: List[Int])
> def main(args: Array[String]): Unit = {
> val executor1 = Executors.newSingleThreadExecutor()
> val executor2 = Executors.newSingleThreadExecutor()
> try {
> val ds = Await.result(Future {
> val session =
> SparkSession.builder().appName("test").master("local[*]").getOrCreate()
> import session.implicits._
> session.createDataset(Data(1 :: Nil) ::
> Nil).write.parquet("test.parquet")
> session.read.parquet("test.parquet").as[Data]
> }(ExecutionContext.fromExecutorService(executor1)), 1.minute)
> Await.result(Future {
> ds.rdd.collect().foreach(println(_))
> }(ExecutionContext.fromExecutorService(executor2)), 1.minute)
> } finally {
> executor1.shutdown()
> executor2.shutdown()
> }
> }
> }
> {code}
> This code fails with following exception:
> {noformat}
> Exception in thread "main" java.util.NoSuchElementException:
> None.getException in thread "main" java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at
> org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at
> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
> at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at
> org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]