[
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15755742#comment-15755742
]
Nicholas Chammas commented on SPARK-18492:
------------------------------------------
I'm hitting this problem as well when I try to apply a bunch of nested SQL
functions to the fields in a struct column. That combination quickly blows up
the size of the generated code, since it repeats the nested functions for each
struct field. The strange thing is that despite spitting out an error and all
this generated Java code, Spark continues along and the program completes
successfully.
Here's a minimal repro that's very similar to what I'm actually doing in my
application:
{code}
from collections import namedtuple
import pyspark
from pyspark.sql import Column
from pyspark.sql.functions import (
struct,
regexp_replace,
lower,
trim,
col,
coalesce,
lit,
)
Person = namedtuple(
'Person', [
'first_name',
'last_name',
'address_1',
'address_2',
'city',
'state',
])
def normalize_udf(column: Column) -> Column:
normalized_column = column
normalized_column = coalesce(normalized_column, lit(''))
normalized_column = (
regexp_replace(
normalized_column,
pattern=r'[^\p{IsLatin}\d\s]+',
replacement=' ',
)
)
normalized_column = (
regexp_replace(
normalized_column,
pattern=r'[\s]+',
replacement=' ',
)
)
normalized_column = lower(trim(normalized_column))
return normalized_column
if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()
raw_df = spark.createDataFrame(
[
(1, Person(' Nick', ' Chammas ', '22 Two Drive ', '', '', '')),
(2, Person('BOB', None, '', '', None, None)),
(3, Person('Guido ', 'van Rossum', ' ', ' ', None, None)),
],
['id', 'person'],
)
normalized_df = (
raw_df
.select(
'id',
struct([
normalize_udf('person.' + field).alias(field)
for field in Person._fields
]).alias('person'))
# Uncomment this persist() and the codegen error goes away.
# However, one of our tests that exercises this code starts
# failing in a strange way.
# .persist()
# The normalize_udf() calls below are repeated to trigger the
# error. In a more realistic scenario, of course, you would have
# other chained function calls.
.select(
'id',
struct([
normalize_udf('person.' + field).alias(field)
for field in Person._fields
]).alias('person'))
.select(
'id',
struct([
normalize_udf('person.' + field).alias(field)
for field in Person._fields
]).alias('person'))
)
normalized_df.show(truncate=False)
{code}
I suppose the workarounds for this type of problem are:
* Play code golf and try to compress what you're doing into fewer function
calls to stay under the 64KB limit.
** Disadvantage: It's difficult and makes the code ugly and difficult to
maintain.
* Use {{persist()}} in strategic locations to force Spark to break up the
generated code into smaller chunks.
** Disadvantage: It's difficult to track and unpersist these intermediate RDDs
that get persisted.
** Disadvantage: One of my tests mysteriously fails when I implement this
approach. (The failing test was a join that started to fail because the join
key types didn't match. Really weird.)
* Rewrite parts of the code to use a non-Spark UDF implementation (i.e. in my
case, pure Python code).
** Disadvantage: You lose the advantages of codegen and gain the overhead of
running stuff in pure Python.
I'm seeing this on 2.0.2 and on master at
{{1ac6567bdb03d7cc5c5f3473827a102280cb1030}} which is from 2 days ago.
[~marmbrus] / [~davies]: What are your thoughts on this?
> GeneratedIterator grows beyond 64 KB
> ------------------------------------
>
> Key: SPARK-18492
> URL: https://issues.apache.org/jira/browse/SPARK-18492
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.1
> Environment: CentOS release 6.7 (Final)
> Reporter: Norris Merritt
>
> spark-submit fails with ERROR CodeGenerator: failed to compile:
> org.codehaus.janino.JaninoRuntimeException: Code of method
> "(I[Lscala/collection/Iterator;)V" of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator"
> grows beyond 64 KB
> Error message is followed by a huge dump of generated source code.
> The generated code declares 1,454 field sequences like the following:
> /* 036 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF
> project_scalaUDF1;
> /* 037 */ private scala.Function1 project_catalystConverter1;
> /* 038 */ private scala.Function1 project_converter1;
> /* 039 */ private scala.Function1 project_converter2;
> /* 040 */ private scala.Function2 project_udf1;
> .... (many omitted lines) ...
> /* 6089 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF
> project_scalaUDF1454;
> /* 6090 */ private scala.Function1 project_catalystConverter1454;
> /* 6091 */ private scala.Function1 project_converter1695;
> /* 6092 */ private scala.Function1 project_udf1454;
> It then proceeds to emit code for several methods (init, processNext) each of
> which has totally repetitive sequences of statements pertaining to each of
> the sequences of variables declared in the class. For example:
> /* 6101 */ public void init(int index, scala.collection.Iterator inputs[]) {
> The reason that the 64KB JVM limit for code for a method is exceeded is
> because the code generator is using an incredibly naive strategy. It emits a
> sequence like the one shown below for each of the 1,454 groups of variables
> shown above, in
> /* 6132 */ this.project_udf =
> (scala.Function1)project_scalaUDF.userDefinedFunc();
> /* 6133 */ this.project_scalaUDF1 =
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10];
> /* 6134 */ this.project_catalystConverter1 =
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType());
> /* 6135 */ this.project_converter1 =
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType());
> /* 6136 */ this.project_converter2 =
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType());
> It blows up after emitting 230 such sequences, while trying to emit the 231st:
> /* 7282 */ this.project_udf230 =
> (scala.Function2)project_scalaUDF230.userDefinedFunc();
> /* 7283 */ this.project_scalaUDF231 =
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240];
> /* 7284 */ this.project_catalystConverter231 =
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType());
> .... many omitted lines ...
> Example of repetitive code sequences emitted for processNext method:
> /* 12253 */ boolean project_isNull247 = project_result244 == null;
> /* 12254 */ MapData project_value247 = null;
> /* 12255 */ if (!project_isNull247) {
> /* 12256 */ project_value247 = project_result244;
> /* 12257 */ }
> /* 12258 */ Object project_arg = sort_isNull5 ? null :
> project_converter489.apply(sort_value5);
> /* 12259 */
> /* 12260 */ ArrayData project_result249 = null;
> /* 12261 */ try {
> /* 12262 */ project_result249 =
> (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg));
> /* 12263 */ } catch (Exception e) {
> /* 12264 */ throw new
> org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e);
> /* 12265 */ }
> /* 12266 */
> /* 12267 */ boolean project_isNull252 = project_result249 == null;
> /* 12268 */ ArrayData project_value252 = null;
> /* 12269 */ if (!project_isNull252) {
> /* 12270 */ project_value252 = project_result249;
> /* 12271 */ }
> /* 12272 */ Object project_arg1 = project_isNull252 ? null :
> project_converter488.apply(project_value252);
> /* 12273 */
> /* 12274 */ ArrayData project_result248 = null;
> /* 12275 */ try {
> /* 12276 */ project_result248 =
> (ArrayData)project_catalystConverter247.apply(project_udf247.apply(project_arg1));
> /* 12277 */ } catch (Exception e) {
> /* 12278 */ throw new
> org.apache.spark.SparkException(project_scalaUDF247.udfErrorMessage(), e);
> /* 12279 */ }
> /* 12280 */
> /* 12281 */ boolean project_isNull251 = project_result248 == null;
> /* 12282 */ ArrayData project_value251 = null;
> /* 12283 */ if (!project_isNull251) {
> /* 12284 */ project_value251 = project_result248;
> /* 12285 */ }
> /* 12286 */ Object project_arg2 = project_isNull251 ? null :
> project_converter487.apply(project_value251);
> /* 12287 */
> /* 12288 */ InternalRow project_result247 = null;
> /* 12289 */ try {
> /* 12290 */ project_result247 =
> (InternalRow)project_catalystConverter246.apply(project_udf246.apply(project_arg2));
> /* 12291 */ } catch (Exception e) {
> /* 12292 */ throw new
> org.apache.spark.SparkException(project_scalaUDF246.udfErrorMessage(), e);
> /* 12293 */ }
> /* 12294 */
> /* 12295 */ boolean project_isNull250 = project_result247 == null;
> /* 12296 */ InternalRow project_value250 = null;
> /* 12297 */ if (!project_isNull250) {
> /* 12298 */ project_value250 = project_result247;
> /* 12299 */ }
> /* 12300 */ Object project_arg3 = project_isNull250 ? null :
> project_converter486.apply(project_value250);
> /* 12301 */
> /* 12302 */ InternalRow project_result246 = null;
> /* 12303 */ try {
> /* 12304 */ project_result246 =
> (InternalRow)project_catalystConverter245.apply(project_udf245.apply(project_arg3));
> /* 12305 */ } catch (Exception e) {
> /* 12306 */ throw new
> org.apache.spark.SparkException(project_scalaUDF245.udfErrorMessage(), e);
> /* 12307 */ }
> /* 12308 */
> It is pretty clear that the code generation strategy is naive. The code
> generator should use arrays and loops instead of emitting all these
> repetitive code sequences which only differ by a few numerical digits used to
> generate the name of the variables.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]