[
https://issues.apache.org/jira/browse/SPARK-25938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675020#comment-16675020
]
Hyukjin Kwon commented on SPARK-25938:
--------------------------------------
I can't reproduce this in the current master:
{code}
scala> spark.sparkContext.setLogLevel("INFO")
scala> var df = Seq(100).toDF("count")
df: org.apache.spark.sql.DataFrame = [count: int]
scala> df.cache()
18/11/05 19:55:59 WARN CacheManager: Asked to cache already cached data.
res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [count: int]
scala> df.count()
18/11/05 19:56:00 INFO SparkContext: Starting job: count at <console>:26
18/11/05 19:56:00 INFO DAGScheduler: Registering RDD 13 (count at <console>:26)
18/11/05 19:56:00 INFO DAGScheduler: Got job 1 (count at <console>:26) with 1
output partitions
18/11/05 19:56:00 INFO DAGScheduler: Final stage: ResultStage 3 (count at
<console>:26)
18/11/05 19:56:00 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 2)
18/11/05 19:56:00 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 2)
18/11/05 19:56:00 INFO DAGScheduler: Submitting ShuffleMapStage 2
(MapPartitionsRDD[13] at count at <console>:26), which has no missing parents
18/11/05 19:56:00 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 17.9 KB, free 366.2 MB)
18/11/05 19:56:00 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in
memory (estimated size 8.3 KB, free 366.2 MB)
18/11/05 19:56:00 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
192.168.1.104:54767 (size: 8.3 KB, free: 366.3 MB)
18/11/05 19:56:00 INFO SparkContext: Created broadcast 2 from broadcast at
DAGScheduler.scala:1164
18/11/05 19:56:00 INFO DAGScheduler: Submitting 1 missing tasks from
ShuffleMapStage 2 (MapPartitionsRDD[13] at count at <console>:26) (first 15
tasks are for partitions Vector(0))
18/11/05 19:56:00 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/11/05 19:56:00 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2,
localhost, executor driver, partition 0, PROCESS_LOCAL, 8033 bytes)
18/11/05 19:56:00 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
18/11/05 19:56:00 INFO BlockManager: Found block rdd_2_0 locally
18/11/05 19:56:00 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 2065
bytes result sent to driver
18/11/05 19:56:00 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2)
in 23 ms on localhost (executor driver) (1/1)
18/11/05 19:56:00 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have
all completed, from pool
18/11/05 19:56:00 INFO DAGScheduler: ShuffleMapStage 2 (count at <console>:26)
finished in 0.032 s
18/11/05 19:56:00 INFO DAGScheduler: looking for newly runnable stages
18/11/05 19:56:00 INFO DAGScheduler: running: Set()
18/11/05 19:56:00 INFO DAGScheduler: waiting: Set(ResultStage 3)
18/11/05 19:56:00 INFO DAGScheduler: failed: Set()
18/11/05 19:56:00 INFO DAGScheduler: Submitting ResultStage 3
(MapPartitionsRDD[16] at count at <console>:26), which has no missing parents
18/11/05 19:56:00 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 7.1 KB, free 366.2 MB)
18/11/05 19:56:00 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in
memory (estimated size 3.8 KB, free 366.2 MB)
18/11/05 19:56:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on
192.168.1.104:54767 (size: 3.8 KB, free: 366.3 MB)
18/11/05 19:56:00 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:1164
18/11/05 19:56:00 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 3 (MapPartitionsRDD[16] at count at <console>:26) (first 15 tasks
are for partitions Vector(0))
18/11/05 19:56:00 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
18/11/05 19:56:00 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3,
localhost, executor driver, partition 0, ANY, 7767 bytes)
18/11/05 19:56:00 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
18/11/05 19:56:00 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks
including 1 local blocks and 0 remote blocks
18/11/05 19:56:00 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in
0 ms
18/11/05 19:56:00 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1782
bytes result sent to driver
18/11/05 19:56:00 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3)
in 7 ms on localhost (executor driver) (1/1)
18/11/05 19:56:00 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have
all completed, from pool
18/11/05 19:56:00 INFO DAGScheduler: ResultStage 3 (count at <console>:26)
finished in 0.015 s
18/11/05 19:56:00 INFO DAGScheduler: Job 1 is finished. Cancelling potential
speculative or zombie tasks for this job
18/11/05 19:56:00 INFO TaskSchedulerImpl: Killing all running tasks in stage 3:
Stage finished
18/11/05 19:56:00 INFO DAGScheduler: Job 1 finished: count at <console>:26,
took 0.057246 s
res4: Long = 1
{code}
Resolving this. If the problem exists in Spark 2.3.1, we should find the JIRA
and backport it.
> Action on cached dataset causes WARN java.lang.AssertionError in log4j logs.
> ----------------------------------------------------------------------------
>
> Key: SPARK-25938
> URL: https://issues.apache.org/jira/browse/SPARK-25938
> Project: Spark
> Issue Type: Bug
> Components: Input/Output
> Affects Versions: 2.3.1
> Environment: We're running Apache Spark 2.3.1 on Databricks 4.2 while
> developing in Scala
> Reporter: Benedikt Beckermann
> Priority: Minor
>
> Caching and counting any DataSet causes an AssertionError in the log4j logs.
> The results are still valid and no stderr is shown.
> Example code:
> {code:scala}
> var df = Seq(100).toDF("count")
> df.cache()
> df.count()
> {code}
> log4j Output:
> {code}
> 18/11/02 15:39:57 WARN ExecutionListenerManager: Error executing query
> execution listener
> java.lang.AssertionError: assertion failed: InMemoryRelation fields: output,
> useCompression, batchSize, storageLevel, child, tableName,
> _cachedColumnBuffers, rowCountStats, sizeInBytesStats, statsOfPlanToCache,
> outputOrdering, values: List(count#124), true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas), LocalTableScan [count#47]
> , None, LocalTableScan [count#47]
> MapPartitionsRDD[2] at cache at command-1842411053765017:3,
> LongAccumulator(id: 0, name: None, value: 1), LongAccumulator(id: 1, name:
> None, value: 4), Statistics(sizeInBytes=12.0 B, hints=none)
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.jsonFields(TreeNode.scala:638)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$collectJsonValue$1(TreeNode.scala:626)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$org$apache$spark$sql$catalyst$trees$TreeNode$$collectJsonValue$1$1.apply(TreeNode.scala:628)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$org$apache$spark$sql$catalyst$trees$TreeNode$$collectJsonValue$1$1.apply(TreeNode.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$collectJsonValue$1(TreeNode.scala:628)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$org$apache$spark$sql$catalyst$trees$TreeNode$$collectJsonValue$1$1.apply(TreeNode.scala:628)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$org$apache$spark$sql$catalyst$trees$TreeNode$$collectJsonValue$1$1.apply(TreeNode.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$collectJsonValue$1(TreeNode.scala:628)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.jsonValue(TreeNode.scala:631)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.toJSON(TreeNode.scala:617)
> at
> com.databricks.backend.daemon.driver.SQLQueryPlanLogger$.getSQLQueryPlanBlob(SQLQueryPlanLogger.scala:70)
> at
> com.databricks.backend.daemon.driver.SQLQueryPlanLogger.onSuccess(SQLQueryPlanLogger.scala:42)
> at
> org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:124)
> at
> org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:123)
> at
> org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:145)
> at
> org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:143)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
> at
> org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:143)
> at
> org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:123)
> at
> org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
> at
> org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
> at
> org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:156)
> at
> org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:122)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3330)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2804)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1842411053765017:4)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1842411053765017:51)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$$iw$$iw$$iw$$iw.<init>(command-1842411053765017:53)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$$iw$$iw$$iw.<init>(command-1842411053765017:55)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$$iw$$iw.<init>(command-1842411053765017:57)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$$iw.<init>(command-1842411053765017:59)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read.<init>(command-1842411053765017:61)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$.<init>(command-1842411053765017:65)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$read$.<clinit>(command-1842411053765017)
> at
> line587cf41c5b6a4080a07e49150407fa2732.$eval$.$print$lzycompute(<notebook>:7)
> at line587cf41c5b6a4080a07e49150407fa2732.$eval$.$print(<notebook>:6)
> at line587cf41c5b6a4080a07e49150407fa2732.$eval.$print(<notebook>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
> at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
> at
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
> at
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
> at
> scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
> at
> scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
> at
> scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
> at
> com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:199)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:189)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
> at
> com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:493)
> at
> com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:448)
> at
> com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:189)
> at
> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:248)
> at
> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:228)
> at
> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:188)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:183)
> at
> com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:40)
> at
> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:221)
> at
> com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:40)
> at
> com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:228)
> at
> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595)
> at
> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595)
> at scala.util.Try$.apply(Try.scala:192)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:590)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:474)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:548)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:380)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:327)
> at
> com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
> at java.lang.Thread.run(Thread.java:748){code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]