[
https://issues.apache.org/jira/browse/SPARK-21828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141138#comment-16141138
]
Otis Smart commented on SPARK-21828:
------------------------------------
Hi KI: I thank you for the expedient reply!
* Here (below text) is example code that generates the error in PySpark 2.1.
* Please forgive me...I initially inadvertently applied this code on a Spark
2.1 (rather than Spark 2.2) cluster; but I moments ago began a test on a Spark
2.2 cluster (definitely this time). Nonetheless, a troubleshoot +
investigation of the aforementioned error may aid others on Spark 2.1 if my
ongoing test yields no error in Pyspark 2.2.
Gratefully + Best,
OS
# OTIS SMART: 24.08.2017 (https://issues.apache.org/jira/browse/SPARK-21828)
#
----------------------------------------------------------------------------------------------------------------------
#
#
----------------------------------------------------------------------------------------------------------------------
#
from pyspark import SparkConf, SparkContext
#
from pyspark.sql import HiveContext
from pyspark.sql.functions import col, lit
#
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#
from pyspark.mllib.random import RandomRDDs as randrdd
#
----------------------------------------------------------------------------------------------------------------------
#
#
----------------------------------------------------------------------------------------------------------------------
#
def computeDT(df):
#
namevAll = df.columns
namevP = namevAll[:-1]
namevC = namevAll[-1]
#
training_data, testing_data = df.randomSplit([0.70, 0.30])
print "done: randomSplit"
#
variableoutStringIndexer = StringIndexer(inputCol=namevC,
outputCol='variableout')
print "done: StringIndexer"
#
variablesinAssembler = VectorAssembler(inputCols=namevP,
outputCol='variablesin')
print "done: VectorAssembler"
#
dTree = DecisionTreeClassifier(labelCol='variableout',
featuresCol='variablesin',
impurity='gini',
maxBins=3)
print "done: DecisionTreeClassifier"
#
pipeline = Pipeline(stages=[variableoutStringIndexer, variablesinAssembler,
dTree])
print "done: Pipeline"
#
paramgrid = ParamGridBuilder().addGrid(dTree.maxDepth, [3, 7, 8]).build()
print "done: ParamGridBuilder"
#
evaluator = MulticlassClassificationEvaluator(labelCol='variableout',
predictionCol='prediction',
metricName="f1")
print "done: MulticlassClassificationEvaluator"
#
CrossValidator(estimator=pipeline,
estimatorParamMaps=paramgrid,
evaluator=evaluator,
numFolds=10).fit(training_data)
print "done: CrossValidatorModel"
#
return True
#
----------------------------------------------------------------------------------------------------------------------
#
#
----------------------------------------------------------------------------------------------------------------------
#
def main():
#
numcols = 1234
numrows = 23456
#
x = randrdd.normalVectorRDD(hiveContext, numrows, numcols, seed=1)
y = randrdd.normalVectorRDD(hiveContext, numrows, numcols, seed=2)
#
dfx = HiveContext.createDataFrame(hiveContext,
x.map(lambda v: tuple([float(ii) for ii in v])).collect(),
["v{0:0>4}".format(jj) for jj in range(0, numcols)])
#
dfy = HiveContext.createDataFrame(hiveContext,
y.map(lambda v: tuple([float(ii) for ii in v])).collect(),
["v{0:0>4}".format(jj) for jj in range(0, numcols)])
#
dfx = dfx.withColumn("v{0:0>4}".format(numcols),
dfx["v{0:0>4}".format(numcols - 1)] + 5).withColumn("V", lit('a'))
dfy = dfy.withColumn("v{0:0>4}".format(numcols),
dfy["v{0:0>4}".format(numcols - 1)] - 5).withColumn("V", lit('b'))
#
df = dfx.union(dfy)
#
df.cache()
#
df.printSchema()
df.show(n=10, truncate=False)
#
computeDT(df)
#
df.unpersist()
#
return True
#
----------------------------------------------------------------------------------------------------------------------
# CONFIGURE SPARK CONTEXT THEN RUN 'MAIN' ANALYSIS
#
----------------------------------------------------------------------------------------------------------------------
#
if __name__ == "__main__":
#
conf =
SparkConf().setAppName("TroubleshootPyspark.ASFJira21828.Otis+Kazuaki"). \
set("spark.sql.tungsten.enabled", "false")
#
sc = SparkContext(conf=conf)
#
hiveContext = HiveContext(sc)
#
main()
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
> grows beyond 64 KB...again
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-21828
> URL: https://issues.apache.org/jira/browse/SPARK-21828
> Project: Spark
> Issue Type: Bug
> Components: ML, SQL
> Affects Versions: 2.1.0
> Reporter: Otis Smart
> Priority: Critical
>
> Hello!
> 1. I encounter a similar issue (see below text) on Pyspark 2.2 (e.g.,
> dataframe with ~50000 rows x 1100+ columns as input to ".fit()" method of
> CrossValidator() that includes Pipeline() that includes StringIndexer(),
> VectorAssembler() and DecisionTreeClassifier()).
> 2. Was the aforementioned patch (aka
> fix(https://github.com/apache/spark/pull/15480) not included in the latest
> release; what are the reason and (source) of and solution to this persistent
> issue please?
> py4j.protocol.Py4JJavaError: An error occurred while calling o9396.fit.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 38
> in stage 18.0 failed 4 times, most recent failure: Lost task 38.3 in stage
> 18.0 (TID 1996, ip-10-0-14-83.ec2.internal, executor 4):
> java.util.concurrent.ExecutionException: java.lang.Exception: failed to
> compile: org.codehaus.janino.JaninoRuntimeException: Code of method
> "compare(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I"
> of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
> grows beyond 64 KB
> /* 001 */ public SpecificOrdering generate(Object[] references)
> { /* 002 */ return new SpecificOrdering(references); /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificOrdering extends
> org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering {
> /* 006 */
> /* 007 */ private Object[] references;
> /* 008 */
> /* 009 */
> /* 010 */ public SpecificOrdering(Object[] references)
> { /* 011 */ this.references = references; /* 012 */ /* 013 */ }
> /* 014 */
> /* 015 */
> /* 016 */
> /* 017 */ public int compare(InternalRow a, InternalRow b) {
> /* 018 */ InternalRow i = null; // Holds current row being evaluated.
> /* 019 */
> /* 020 */ i = a;
> /* 021 */ boolean isNullA;
> /* 022 */ double primitiveA;
> /* 023 */
> { /* 024 */ /* 025 */ double value = i.getDouble(0); /* 026 */ isNullA =
> false; /* 027 */ primitiveA = value; /* 028 */ }
> /* 029 */ i = b;
> /* 030 */ boolean isNullB;
> /* 031 */ double primitiveB;
> /* 032 */
> { /* 033 */ /* 034 */ double value = i.getDouble(0); /* 035 */ isNullB =
> false; /* 036 */ primitiveB = value; /* 037 */ }
> /* 038 */ if (isNullA && isNullB)
> { /* 039 */ // Nothing /* 040 */ }
> else if (isNullA)
> { /* 041 */ return -1; /* 042 */ }
> else if (isNullB)
> { /* 043 */ return 1; /* 044 */ }
> else {
> /* 045 */ int comp =
> org.apache.spark.util.Utils.nanSafeCompareDoubles(primitiveA, primitiveB);
> /* 046 */ if (comp != 0)
> { /* 047 */ return comp; /* 048 */ }
> /* 049 */ }
> /* 050 */
> /* 051 */
> ...
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]