[
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andres Fernandez updated SPARK-27613:
-------------------------------------
Component/s: (was: Spark Core)
PySpark
> Caching an RDD composed of Row Objects produces some kind of key recombination
> ------------------------------------------------------------------------------
>
> Key: SPARK-27613
> URL: https://issues.apache.org/jira/browse/SPARK-27613
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.4.0
> Reporter: Andres Fernandez
> Priority: Major
>
> (Code included at the bottom)
> The function "+create_dataframes_from_azure_responses_rdd+" receives
> *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_,
> _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will
> then go ahead and iterate over the table names to create dataframes filtering
> the RDDs by the first element and valid response.
> So far so good.
> QueryResponse object (from azure.loganalytics package) contains, essentialy,
> a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field.
> Every single response (fifth element of the tuple [4]) for the same table
> name (first element of the tuple [0]) has exactly the same columns in the
> same order (order is not important other thant to reference the rows data
> inside the same response anyways). The types are stored in *column_types*
> taking the first response as the sample.
> Now to the tricky part.
> I call flatMap on the *responses_rdd* with the function
> +"tabularize_response_rdd+" which basically creates a Row object for every
> row (_list_ of _str_) in the _QueryResponse_. I also create the schema based
> on a *type_map* from azure types to spark.sql.types in order to specify it to
> the subsequent createDataFrame instruction. If the result of this flatMap,
> *table_tabular_rdd*, is not cached before creating the DataFrame from the
> Rows RDD everything works smoothly. Nevertheless if the result of the
> flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a
> mismatch is evidenced between the actual key:values for the Row objects.
> It would be good to point that when a Row Object is created from an unpacked
> dict the code in
> [[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
> sorts the keys; is this behaviour overriden somehow by caching?
> Let me please know what I am doing wrong, is there any best practice /
> documented solution I am not following? Im just a beginner when it comes to
> Spark and would happily accept any suggestion. I hope I was clear enough, and
> I am open to give you any additional details that might be helpful. Thank
> you! (Code and error attached as well).
> The error looks like if it was related to casting, but it can be seen that
> the contents do not correspond to the key. *record_count* key is actually a
> Long but in the Row it got somehow swapped for another key's value, in this
> case 'n/a'.
> {code:java}
> def create_dataframes_from_azure_responses_rdd(table_names: list,
> responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
> ws_column_name = "WorkspaceId"
> def tabularize_response_rdd(x: tuple):
> import pyspark
> tn, wsid, count, interval, response = x
> ret = []
> if response.tables[0].rows:
> ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi
> in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
> return ret
> data_frames = {}
> for tn in table_names:
> if verbose: print("Filtering RDD items for {}".format(tn))
> table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and
> x[4]!=None).cache()
>
> data_frames[tn] = None
> if verbose: print("Checking if RDD for {} has data".format(tn))
> if not table_response_rdd.isEmpty():
> if verbose: print("Getting column types for {} from azure
> response".format(tn))
> column_types = {f.name:f.type for f in
> table_response_rdd.take(1)[0][4].tables[0].columns}
> column_types[ws_column_name] = "string"
> if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
> table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd)
> #.cache() #Error with cache, no error without!
> if verbose: print("Getting sample row for {}".format(tn))
> row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
> if verbose: print("Building schema for {} from sample row and column
> types".format(tn))
> current_schema = StructType([StructField(f,
> type_map[column_types[f]](), True) for f in row_fields])
> if verbose: print("Creating dataframe for {}".format(tn))
> table_df = spark.createDataFrame(table_tabular_rdd,
> schema=current_schema).cache()
> if verbose: print("Calculating expected count for {}".format(tn))
> expected_count = table_response_rdd.map(lambda x:
> (x[1],x[2])).distinct().map(lambda x: x[1]).sum()
> real_count =
> table_df.select("record_count").groupBy().sum().collect()[0][0]
> table_response_rdd.unpersist()
> #table_tabular_rdd.unpersist()
> if verbose: print("Expected count {} vs Real count
> {}".format(expected_count, real_count))
> data_frames[tn]=table_df
> else:
> if verbose: print("{} table was empty!".format(tn))
> return data_frames
> {code}
> {noformat}
> Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in
> <module>() 1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1,
> sta, container, sta_key, tid, creds, 5000, True) 2 resrdds.cache()
> ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds,
> True) 4 resrdds.unpersist() <command-2824384765475774> in
> create_dataframes_from_azure_responses_rdd(table_names, responses_rdd,
> verbose) 37 if verbose: print("Calculating expected count for
> {}".format(tn)) 38 expected_count = table_response_rdd.map(lambda x:
> (x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39 real_count =
> table_df.select("record_count").groupBy().sum().collect()[0][0] 40
> table_response_rdd.unpersist() 41 #table_tabular_rdd.unpersist()
> /databricks/spark/python/pyspark/sql/dataframe.py in collect(self) 546 #
> Default path used in OSS Spark / for non-DF-ACL clusters: 547 with
> SCCallSiteSync(self._sc) as css: --> 548 sock_info =
> self._jdf.collectToPython() 549 return list(_load_from_socket(sock_info,
> BatchedSerializer(PickleSerializer()))) 550
> /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in
> __call__(self, *args) 1255 answer =
> self.gateway_client.send_command(command) 1256 return_value =
> get_return_value( -> 1257 answer, self.gateway_client,
> self.target_id, self.name)
> 1258 1259 for temp_arg in temp_args:
> /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def
> deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64
> except py4j.protocol.Py4JJavaError as e: 65 s =
> e.java_exception.toString()
> /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in
> get_return_value(answer, gateway_client, target_id, name) 326 raise
> Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". -->
> 328 format(target_id, ".", name), value)
> 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while
> calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent
> failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main
> process() File "/databricks/spark/python/pyspark/worker.py", line 398, in
> process serializer.dump_stream(func(split_index, iterator), outfile) File
> "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream
> vs = list(itertools.islice(iterator, batch)) File
> "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return
> f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py",
> line 785, in prepare verify_func(obj) File
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line
> 1370, in verify_struct verifier(v) File
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line
> 1383, in verify_default verify_acceptable_types(obj) File
> "/databricks/spark/python/pyspark/sql/types.py", line 1278, in
> verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field
> record_count: LongType can not accept object 'n/a' in type <class 'str'> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
> at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626)
> at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source) at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
> at
> org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
> at
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at
> org.apache.spark.scheduler.Task.run(Task.scala:112) at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
> at scala.Option.foreach(Option.scala:257) at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at
> org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234)
> at
> org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269)
> at
> org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69)
> at
> org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
> at
> org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
> at
> org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312)
> at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at
> org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at
> org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
> at
> org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422)
> at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at
> py4j.Gateway.invoke(Gateway.java:295) at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at
> py4j.commands.CallCommand.execute(CallCommand.java:79) at
> py4j.GatewayConnection.run(GatewayConnection.java:251) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main
> process() File "/databricks/spark/python/pyspark/worker.py", line 398, in
> process serializer.dump_stream(func(split_index, iterator), outfile) File
> "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream
> vs = list(itertools.islice(iterator, batch)) File
> "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return
> f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py",
> line 785, in prepare verify_func(obj) File
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line
> 1370, in verify_struct verifier(v) File
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line
> 1383, in verify_default verify_acceptable_types(obj) File
> "/databricks/spark/python/pyspark/sql/types.py", line 1278, in
> verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field
> record_count: LongType can not accept object 'n/a' in type <class 'str'> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
> at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626)
> at
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source) at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
> at
> org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
> at
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at
> org.apache.spark.scheduler.Task.run(Task.scala:112) at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]