[
https://issues.apache.org/jira/browse/SPARK-34002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Hamilton updated SPARK-34002:
----------------------------------
Description:
UDFs can behave differently depending on if a dataframe is cached, despite the
dataframe being identical
Repro:
{code:java}
case class Bar(a: Int)
import spark.implicits._
def f1(bar: Bar): Option[Bar] = {
None
}
def f2(bar: Bar): Option[Bar] = {
Option(bar)
}
val udf1: UserDefinedFunction = udf(f1 _)
val udf2: UserDefinedFunction = udf(f2 _)
// Commenting in the cache will make this example work
val df = (1 to 10).map(i => Tuple1(Bar(1))).toDF("c0")//.cache()
val newDf = df
.withColumn("c1", udf1(col("c0")))
.withColumn("c2", udf2(col("c1")))
newDf.show()
{code}
Error:
{code:java}
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties21/01/05 00:52:57 INFO SparkContext:
Running Spark version 3.0.121/01/05 00:52:57 WARN NativeCodeLoader: Unable to
load native-hadoop library for your platform... using builtin-java classes
where applicable21/01/05 00:52:57 INFO ResourceUtils:
==============================================================21/01/05 00:52:57
INFO ResourceUtils: Resources for spark.driver:
21/01/05 00:52:57 INFO ResourceUtils:
==============================================================21/01/05 00:52:57
INFO SparkContext: Submitted application: JsonOutputParserSuite21/01/05
00:52:57 INFO SparkContext: Spark
configuration:spark.app.name=JsonOutputParserSuitespark.driver.maxResultSize=6gspark.logConf=truespark.master=local[*]spark.sql.crossJoin.enabled=truespark.sql.shuffle.partitions=20spark.sql.warehouse.dir=file:/code/mmlspark/spark-warehouse21/01/05
00:52:58 INFO SecurityManager: Changing view acls to: marhamil21/01/05
00:52:58 INFO SecurityManager: Changing modify acls to: marhamil21/01/05
00:52:58 INFO SecurityManager: Changing view acls groups to: 21/01/05 00:52:58
INFO SecurityManager: Changing modify acls groups to: 21/01/05 00:52:58 INFO
SecurityManager: SecurityManager: authentication disabled; ui acls disabled;
users with view permissions: Set(marhamil); groups with view permissions:
Set(); users with modify permissions: Set(marhamil); groups with modify
permissions: Set()21/01/05 00:52:58 INFO Utils: Successfully started service
'sparkDriver' on port 52315.21/01/05 00:52:58 INFO SparkEnv: Registering
MapOutputTracker21/01/05 00:52:58 INFO SparkEnv: Registering
BlockManagerMaster21/01/05 00:52:58 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information21/01/05 00:52:58 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up21/01/05 00:52:58 INFO SparkEnv: Registering
BlockManagerMasterHeartbeat21/01/05 00:52:58 INFO DiskBlockManager: Created
local directory at
C:\Users\marhamil\AppData\Local\Temp\blockmgr-9a5c80ef-ade6-41ac-9933-a26f6c29171921/01/05
00:52:58 INFO MemoryStore: MemoryStore started with capacity 4.0 GiB21/01/05
00:52:59 INFO SparkEnv: Registering OutputCommitCoordinator21/01/05 00:52:59
INFO Utils: Successfully started service 'SparkUI' on port 4040.21/01/05
00:52:59 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://host.docker.internal:404021/01/05 00:52:59 INFO Executor: Starting
executor ID driver on host host.docker.internal21/01/05 00:52:59 INFO Utils:
Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
52359.21/01/05 00:52:59 INFO NettyBlockTransferService: Server created on
host.docker.internal:5235921/01/05 00:52:59 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy21/01/05 00:52:59 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, host.docker.internal, 52359, None)21/01/05 00:52:59 INFO
BlockManagerMasterEndpoint: Registering block manager
host.docker.internal:52359 with 4.0 GiB RAM, BlockManagerId(driver,
host.docker.internal, 52359, None)21/01/05 00:52:59 INFO BlockManagerMaster:
Registered BlockManager BlockManagerId(driver, host.docker.internal, 52359,
None)21/01/05 00:52:59 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, host.docker.internal, 52359, None)21/01/05 00:53:00 WARN
SharedState: Not allowing to set spark.sql.warehouse.dir or
hive.metastore.warehouse.dir in SparkSession's options, it should be set
statically for cross-session usagesFailed to execute user defined
function(JsonOutputParserSuite$$Lambda$574/51376124: (struct<a:int>) =>
struct<a:int>)org.apache.spark.SparkException: Failed to execute user defined
function(JsonOutputParserSuite$$Lambda$574/51376124: (struct<a:int>) =>
struct<a:int>) at
org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130) at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:83)
at
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.$anonfun$applyOrElse$71(Optimizer.scala:1508)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941) at
scala.collection.Iterator.foreach$(Iterator.scala:941) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at
scala.collection.IterableLike.foreach(IterableLike.scala:74) at
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
scala.collection.TraversableLike.map(TraversableLike.scala:238) at
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at
scala.collection.AbstractTraversable.map(Traversable.scala:108) at
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.applyOrElse(Optimizer.scala:1508)
at
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.applyOrElse(Optimizer.scala:1503)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397) at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350) at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397) at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350) at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397) at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350) at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
at
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1503)
at
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1502)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
at scala.collection.immutable.List.foreach(List.scala:392) at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197)
at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
org.apache.spark.sql.Dataset.head(Dataset.scala:2697) at
org.apache.spark.sql.Dataset.take(Dataset.scala:2904) at
org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at
org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at
org.apache.spark.sql.Dataset.show(Dataset.scala:824) at
org.apache.spark.sql.Dataset.show(Dataset.scala:783) at
org.apache.spark.sql.Dataset.show(Dataset.scala:792) at
com.microsoft.ml.spark.io.split1.JsonOutputParserSuite.$anonfun$new$1(ParserSuite.scala:84)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at
org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at
org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at
org.scalatest.Transformer.apply(Transformer.scala:22) at
org.scalatest.Transformer.apply(Transformer.scala:20) at
org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at
org.scalatest.TestSuite.withFixture(TestSuite.scala:196) at
org.scalatest.TestSuite.withFixture$(TestSuite.scala:195) at
org.scalatest.FunSuite.withFixture(FunSuite.scala:1560) at
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184) at
org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196) at
org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at
org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196) at
org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178) at
com.microsoft.ml.spark.core.test.base.TestBase.org$scalatest$BeforeAndAfterEachTestData$$super$runTest(TestBase.scala:74)
at
org.scalatest.BeforeAndAfterEachTestData.runTest(BeforeAndAfterEachTestData.scala:194)
at
org.scalatest.BeforeAndAfterEachTestData.runTest$(BeforeAndAfterEachTestData.scala:187)
at com.microsoft.ml.spark.core.test.base.TestBase.runTest(TestBase.scala:74)
at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229) at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396) at
scala.collection.immutable.List.foreach(List.scala:392) at
org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at
org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379) at
org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) at
org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229) at
org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228) at
org.scalatest.FunSuite.runTests(FunSuite.scala:1560) at
org.scalatest.Suite.run(Suite.scala:1147) at
org.scalatest.Suite.run$(Suite.scala:1129) at
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233) at
org.scalatest.SuperEngine.runImpl(Engine.scala:521) at
org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233) at
org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232) at
com.microsoft.ml.spark.core.test.base.TestBase.org$scalatest$BeforeAndAfterAll$$super$run(TestBase.scala:74)
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at
com.microsoft.ml.spark.core.test.base.TestBase.run(TestBase.scala:74) at
org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
at
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
at scala.collection.immutable.List.foreach(List.scala:392) at
org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340) at
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
at
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
at
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
at
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850) at
org.scalatest.tools.Runner.run(Runner.scala) at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:41)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)Caused
by: java.lang.RuntimeException: Error while decoding:
java.lang.NullPointerExceptionnewInstance(class
com.microsoft.ml.spark.io.split1.Bar) at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
at
org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$createToScalaConverter$1(ScalaUDF.scala:115)
at
org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2(ScalaUDF.scala:157)
at
org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1127)
... 148 moreCaused by: java.lang.NullPointerException at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source) at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
... 151 more
{code}
Expected output when cache is uncommented:
{code:java}
+---+----+----+
| c0| c1| c2|
+---+----+----+
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
|[1]|null|null|
+---+----+----+
{code}
was:
UDFs can behave differently depending on if a dataframe is cached, despite the
dataframe being identical
Repro:
{code:java}
case class Bar(a: Int)
import spark.implicits._
def f1(bar: Bar): Option[Bar] = {
None
}
def f2(bar: Bar): Option[Bar] = {
Option(bar)
}
val udf1: UserDefinedFunction = udf(f1 _)
val udf2: UserDefinedFunction = udf(f2 _)
// Commenting in the cache will make this example work
val df = (1 to 10).map(i => Tuple1(Bar(1))).toDF("c0")//.cache()
val newDf = df
.withColumn("c1", udf1(col("c0")))
.withColumn("c2", udf2(col("c1")))
newDf.show()
{code}
Environment:
Windows 10 Surface book 3
Local Spark
IntelliJ Idea
Summary: Broken UDF Encoding (was: Broken UDF behavior)
> Broken UDF Encoding
> -------------------
>
> Key: SPARK-34002
> URL: https://issues.apache.org/jira/browse/SPARK-34002
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.0.1
> Environment: Windows 10 Surface book 3
> Local Spark
> IntelliJ Idea
>
> Reporter: Mark Hamilton
> Priority: Critical
>
> UDFs can behave differently depending on if a dataframe is cached, despite
> the dataframe being identical
>
> Repro:
>
> {code:java}
> case class Bar(a: Int)
>
> import spark.implicits._
> def f1(bar: Bar): Option[Bar] = {
> None
> }
> def f2(bar: Bar): Option[Bar] = {
> Option(bar)
> }
> val udf1: UserDefinedFunction = udf(f1 _)
> val udf2: UserDefinedFunction = udf(f2 _)
> // Commenting in the cache will make this example work
> val df = (1 to 10).map(i => Tuple1(Bar(1))).toDF("c0")//.cache()
> val newDf = df
> .withColumn("c1", udf1(col("c0")))
> .withColumn("c2", udf2(col("c1")))
> newDf.show()
> {code}
>
> Error:
> {code:java}
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties21/01/05 00:52:57 INFO
> SparkContext: Running Spark version 3.0.121/01/05 00:52:57 WARN
> NativeCodeLoader: Unable to load native-hadoop library for your platform...
> using builtin-java classes where applicable21/01/05 00:52:57 INFO
> ResourceUtils:
> ==============================================================21/01/05
> 00:52:57 INFO ResourceUtils: Resources for spark.driver:
> 21/01/05 00:52:57 INFO ResourceUtils:
> ==============================================================21/01/05
> 00:52:57 INFO SparkContext: Submitted application:
> JsonOutputParserSuite21/01/05 00:52:57 INFO SparkContext: Spark
> configuration:spark.app.name=JsonOutputParserSuitespark.driver.maxResultSize=6gspark.logConf=truespark.master=local[*]spark.sql.crossJoin.enabled=truespark.sql.shuffle.partitions=20spark.sql.warehouse.dir=file:/code/mmlspark/spark-warehouse21/01/05
> 00:52:58 INFO SecurityManager: Changing view acls to: marhamil21/01/05
> 00:52:58 INFO SecurityManager: Changing modify acls to: marhamil21/01/05
> 00:52:58 INFO SecurityManager: Changing view acls groups to: 21/01/05
> 00:52:58 INFO SecurityManager: Changing modify acls groups to: 21/01/05
> 00:52:58 INFO SecurityManager: SecurityManager: authentication disabled; ui
> acls disabled; users with view permissions: Set(marhamil); groups with view
> permissions: Set(); users with modify permissions: Set(marhamil); groups
> with modify permissions: Set()21/01/05 00:52:58 INFO Utils: Successfully
> started service 'sparkDriver' on port 52315.21/01/05 00:52:58 INFO SparkEnv:
> Registering MapOutputTracker21/01/05 00:52:58 INFO SparkEnv: Registering
> BlockManagerMaster21/01/05 00:52:58 INFO BlockManagerMasterEndpoint: Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information21/01/05 00:52:58 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up21/01/05 00:52:58 INFO SparkEnv: Registering
> BlockManagerMasterHeartbeat21/01/05 00:52:58 INFO DiskBlockManager: Created
> local directory at
> C:\Users\marhamil\AppData\Local\Temp\blockmgr-9a5c80ef-ade6-41ac-9933-a26f6c29171921/01/05
> 00:52:58 INFO MemoryStore: MemoryStore started with capacity 4.0 GiB21/01/05
> 00:52:59 INFO SparkEnv: Registering OutputCommitCoordinator21/01/05 00:52:59
> INFO Utils: Successfully started service 'SparkUI' on port 4040.21/01/05
> 00:52:59 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
> http://host.docker.internal:404021/01/05 00:52:59 INFO Executor: Starting
> executor ID driver on host host.docker.internal21/01/05 00:52:59 INFO Utils:
> Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port
> 52359.21/01/05 00:52:59 INFO NettyBlockTransferService: Server created on
> host.docker.internal:5235921/01/05 00:52:59 INFO BlockManager: Using
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
> policy21/01/05 00:52:59 INFO BlockManagerMaster: Registering BlockManager
> BlockManagerId(driver, host.docker.internal, 52359, None)21/01/05 00:52:59
> INFO BlockManagerMasterEndpoint: Registering block manager
> host.docker.internal:52359 with 4.0 GiB RAM, BlockManagerId(driver,
> host.docker.internal, 52359, None)21/01/05 00:52:59 INFO BlockManagerMaster:
> Registered BlockManager BlockManagerId(driver, host.docker.internal, 52359,
> None)21/01/05 00:52:59 INFO BlockManager: Initialized BlockManager:
> BlockManagerId(driver, host.docker.internal, 52359, None)21/01/05 00:53:00
> WARN SharedState: Not allowing to set spark.sql.warehouse.dir or
> hive.metastore.warehouse.dir in SparkSession's options, it should be set
> statically for cross-session usagesFailed to execute user defined
> function(JsonOutputParserSuite$$Lambda$574/51376124: (struct<a:int>) =>
> struct<a:int>)org.apache.spark.SparkException: Failed to execute user defined
> function(JsonOutputParserSuite$$Lambda$574/51376124: (struct<a:int>) =>
> struct<a:int>) at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:83)
> at
> org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.$anonfun$applyOrElse$71(Optimizer.scala:1508)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at
> scala.collection.Iterator.foreach(Iterator.scala:941) at
> scala.collection.Iterator.foreach$(Iterator.scala:941) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableLike.map(TraversableLike.scala:238) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:231) at
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at
> org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.applyOrElse(Optimizer.scala:1508)
> at
> org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.applyOrElse(Optimizer.scala:1503)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298) at
> org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1503)
> at
> org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1502)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
> at
> scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
> at
> scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
> at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
> at scala.collection.immutable.List.foreach(List.scala:392) at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
> at
> org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
> at
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
> at
> org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197)
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381) at
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197)
> at
> org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
> org.apache.spark.sql.Dataset.head(Dataset.scala:2697) at
> org.apache.spark.sql.Dataset.take(Dataset.scala:2904) at
> org.apache.spark.sql.Dataset.getRows(Dataset.scala:300) at
> org.apache.spark.sql.Dataset.showString(Dataset.scala:337) at
> org.apache.spark.sql.Dataset.show(Dataset.scala:824) at
> org.apache.spark.sql.Dataset.show(Dataset.scala:783) at
> org.apache.spark.sql.Dataset.show(Dataset.scala:792) at
> com.microsoft.ml.spark.io.split1.JsonOutputParserSuite.$anonfun$new$1(ParserSuite.scala:84)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
> org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at
> org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at
> org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at
> org.scalatest.Transformer.apply(Transformer.scala:22) at
> org.scalatest.Transformer.apply(Transformer.scala:20) at
> org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at
> org.scalatest.TestSuite.withFixture(TestSuite.scala:196) at
> org.scalatest.TestSuite.withFixture$(TestSuite.scala:195) at
> org.scalatest.FunSuite.withFixture(FunSuite.scala:1560) at
> org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184) at
> org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196) at
> org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at
> org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196) at
> org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178) at
> com.microsoft.ml.spark.core.test.base.TestBase.org$scalatest$BeforeAndAfterEachTestData$$super$runTest(TestBase.scala:74)
> at
> org.scalatest.BeforeAndAfterEachTestData.runTest(BeforeAndAfterEachTestData.scala:194)
> at
> org.scalatest.BeforeAndAfterEachTestData.runTest$(BeforeAndAfterEachTestData.scala:187)
> at com.microsoft.ml.spark.core.test.base.TestBase.runTest(TestBase.scala:74)
> at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229) at
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396) at
> scala.collection.immutable.List.foreach(List.scala:392) at
> org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at
> org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379) at
> org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) at
> org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229) at
> org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228) at
> org.scalatest.FunSuite.runTests(FunSuite.scala:1560) at
> org.scalatest.Suite.run(Suite.scala:1147) at
> org.scalatest.Suite.run$(Suite.scala:1129) at
> org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
> at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233) at
> org.scalatest.SuperEngine.runImpl(Engine.scala:521) at
> org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233) at
> org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232) at
> com.microsoft.ml.spark.core.test.base.TestBase.org$scalatest$BeforeAndAfterAll$$super$run(TestBase.scala:74)
> at
> org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at
> org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at
> org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at
> com.microsoft.ml.spark.core.test.base.TestBase.run(TestBase.scala:74) at
> org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
> at
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
> at scala.collection.immutable.List.foreach(List.scala:392) at
> org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340) at
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
> at
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
> at
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
> at
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
> at org.scalatest.tools.Runner$.run(Runner.scala:850) at
> org.scalatest.tools.Runner.run(Runner.scala) at
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:41)
> at
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)Caused
> by: java.lang.RuntimeException: Error while decoding:
> java.lang.NullPointerExceptionnewInstance(class
> com.microsoft.ml.spark.io.split1.Bar) at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
> at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$createToScalaConverter$1(ScalaUDF.scala:115)
> at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2(ScalaUDF.scala:157)
> at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1127)
> ... 148 moreCaused by: java.lang.NullPointerException at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
> Source) at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
> ... 151 more
> {code}
> Expected output when cache is uncommented:
> {code:java}
> +---+----+----+
> | c0| c1| c2|
> +---+----+----+
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> |[1]|null|null|
> +---+----+----+
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]