The OOM happen in driver, you may also need more memory for driver. On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu <dav...@databricks.com> wrote: > You are using lots of tiny executors (128 executor with only 2G > memory), could you try with bigger executor (for example 16G x 16)? > > On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen <bteeu...@gmail.com> wrote: >> >> So I wrote some code to reproduce the problem. >> >> I assume here that a pipeline should be able to transform a categorical >> feature with a few million levels. >> So I create a dataframe with the categorical feature (‘id’), apply a >> StringIndexer and OneHotEncoder transformer, and run a loop where I increase >> the amount of levels. >> It breaks at 1.276.000 levels. >> >> Shall I report this as a ticket in JIRA? >> >> ____________ >> >> >> from pyspark.sql.functions import rand >> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler >> from pyspark.ml import Pipeline >> >> start_id = 100000 >> n = 5000000 >> step = (n - start_id) / 25 >> >> for i in xrange(start_id,start_id + n,step): >> print "#########\n {}".format(i) >> dfr = (sqlContext >> .range(start_id, start_id + i) >> .withColumn(‘label', rand(seed=10)) >> .withColumn('feat2', rand(seed=101)) >> # .withColumn('normal', randn(seed=27)) >> ).repartition(32).cache() >> # dfr.select("id", rand(seed=10).alias("uniform"), >> randn(seed=27).alias("normal")).show() >> dfr.show(1) >> print "This dataframe has {0} rows (and therefore {0} levels will be one >> hot encoded)".format(dfr.count()) >> >> categorical_feature = ['id'] >> stages = [] >> >> for c in categorical_feature: >> stages.append(StringIndexer(inputCol=c, >> outputCol="{}Index".format(c))) >> stages.append(OneHotEncoder(dropLast= False, inputCol = >> "{}Index".format(c), outputCol = "{}OHE".format(c))) >> >> columns = ["{}OHE".format(x) for x in categorical_feature] >> columns.append('feat2') >> >> assembler = VectorAssembler( >> inputCols=columns, >> outputCol="features") >> stages.append(assembler) >> >> df2 = dfr >> >> pipeline = Pipeline(stages=stages) >> pipeline_fitted = pipeline.fit(df2) >> df3 = pipeline_fitted.transform(df2) >> df3.show(1) >> dfr.unpersist() >> >> >> ____________ >> >> Output: >> >> >> ######### >> 100000 >> +------+---------------------------+-------------------+ >> | id|label | feat2| >> +------+---------------------------+-------------------+ >> |183601| 0.38693226548356197|0.04485291680169634| >> +------+---------------------------+-------------------+ >> only showing top 1 row >> >> This dataframe has 100000 rows (and therefore 100000 levels will be one hot >> encoded) >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> | id|label | feat2|idIndex| >> idOHE| features| >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> |183601| >> 0.38693226548356197|0.04485291680169634|83240.0|(100000,[83240],[...|(100001,[83240,10...| >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> only showing top 1 row >> >> ######### >> 296000 >> +------+---------------------------+-------------------+ >> | id|label | feat2| >> +------+---------------------------+-------------------+ >> |137008| 0.2996020619810592|0.38693226548356197| >> +------+---------------------------+-------------------+ >> only showing top 1 row >> >> This dataframe has 296000 rows (and therefore 296000 levels will be one hot >> encoded) >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> | id|label | feat2|idIndex| >> idOHE| features| >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> |137008| >> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...| >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> only showing top 1 row >> >> ######### >> 492000 >> +------+---------------------------+-------------------+ >> | id|label | feat2| >> +------+---------------------------+-------------------+ >> |534351| 0.9450641392552516|0.23472935141246665| >> +------+---------------------------+-------------------+ >> only showing top 1 row >> >> This dataframe has 492000 rows (and therefore 492000 levels will be one hot >> encoded) >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> | id|label | feat2|idIndex| >> idOHE| features| >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> |534351| 0.9450641392552516|0.23472935141246665| >> 3656.0|(492000,[3656],[1...|(492001,[3656,492...| >> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >> only showing top 1 row >> >> ######### >> 688000 >> +------+---------------------------+------------------+ >> | id|label | feat2| >> +------+---------------------------+------------------+ >> |573008| 0.3059347083549171|0.4846147657830415| >> +------+---------------------------+------------------+ >> only showing top 1 row >> >> This dataframe has 688000 rows (and therefore 688000 levels will be one hot >> encoded) >> +------+---------------------------+------------------+--------+--------------------+--------------------+ >> | id|label | feat2| idIndex| >> idOHE| features| >> +------+---------------------------+------------------+--------+--------------------+--------------------+ >> |573008| >> 0.3059347083549171|0.4846147657830415|475855.0|(688000,[475855],...|(688001,[475855,6...| >> +------+---------------------------+------------------+--------+--------------------+--------------------+ >> only showing top 1 row >> >> ######### >> 884000 >> +------+---------------------------+------------------+ >> | id|label | feat2| >> +------+---------------------------+------------------+ >> |970195| 0.34345290476989165|0.9843176058907069| >> +------+---------------------------+------------------+ >> only showing top 1 row >> >> This dataframe has 884000 rows (and therefore 884000 levels will be one hot >> encoded) >> +------+---------------------------+------------------+--------+--------------------+--------------------+ >> | id|label | feat2| idIndex| >> idOHE| features| >> +------+---------------------------+------------------+--------+--------------------+--------------------+ >> |970195| >> 0.34345290476989165|0.9843176058907069|333915.0|(884000,[333915],...|(884001,[333915,8...| >> +------+---------------------------+------------------+--------+--------------------+--------------------+ >> only showing top 1 row >> >> ######### >> 1080000 >> +------+---------------------------+-----------------+ >> | id|label | feat2| >> +------+---------------------------+-----------------+ >> |403758| 0.6333344187975314|0.774327685753309| >> +------+---------------------------+-----------------+ >> only showing top 1 row >> >> This dataframe has 1080000 rows (and therefore 1080000 levels will be one >> hot encoded) >> +------+---------------------------+-----------------+--------+--------------------+--------------------+ >> | id|label | feat2| idIndex| >> idOHE| features| >> +------+---------------------------+-----------------+--------+--------------------+--------------------+ >> |403758| >> 0.6333344187975314|0.774327685753309|287898.0|(1080000,[287898]...|(1080001,[287898,...| >> +------+---------------------------+-----------------+--------+--------------------+--------------------+ >> only showing top 1 row >> >> ######### >> 1276000 >> +------+---------------------------+------------------+ >> | id|label | feat2| >> +------+---------------------------+------------------+ >> |508726| 0.2513814327408137|0.8480577183702391| >> +------+---------------------------+------------------+ >> only showing top 1 row >> >> This dataframe has 1276000 rows (and therefore 1276000 levels will be one >> hot encoded) >> >> --------------------------------------------------------------------------- >> Py4JJavaError Traceback (most recent call last) >> <ipython-input-2-f5c9fe263872> in <module>() >> 38 pipeline = Pipeline(stages=stages) >> 39 pipeline_fitted = pipeline.fit(df2) >> ---> 40 df3 = pipeline_fitted.transform(df2) >> 41 df3.show(1) >> 42 dfr.unpersist() >> >> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, >> params) >> 103 return self.copy(params)._transform(dataset) >> 104 else: >> --> 105 return self._transform(dataset) >> 106 else: >> 107 raise ValueError("Params must be a param map but got >> %s." % type(params)) >> >> /opt/spark/2.0.0/python/pyspark/ml/pipeline.py in _transform(self, dataset) >> 196 def _transform(self, dataset): >> 197 for t in self.stages: >> --> 198 dataset = t.transform(dataset) >> 199 return dataset >> 200 >> >> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, >> params) >> 103 return self.copy(params)._transform(dataset) >> 104 else: >> --> 105 return self._transform(dataset) >> 106 else: >> 107 raise ValueError("Params must be a param map but got >> %s." % type(params)) >> >> /opt/spark/2.0.0/python/pyspark/ml/wrapper.py in _transform(self, dataset) >> 227 def _transform(self, dataset): >> 228 self._transfer_params_to_java() >> --> 229 return DataFrame(self._java_obj.transform(dataset._jdf), >> dataset.sql_ctx) >> 230 >> 231 >> >> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in >> __call__(self, *args) >> 931 answer = self.gateway_client.send_command(command) >> 932 return_value = get_return_value( >> --> 933 answer, self.gateway_client, self.target_id, self.name) >> 934 >> 935 for temp_arg in temp_args: >> >> /opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw) >> 61 def deco(*a, **kw): >> 62 try: >> ---> 63 return f(*a, **kw) >> 64 except py4j.protocol.Py4JJavaError as e: >> 65 s = e.java_exception.toString() >> >> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in >> get_return_value(answer, gateway_client, target_id, name) >> 310 raise Py4JJavaError( >> 311 "An error occurred while calling {0}{1}{2}.\n". >> --> 312 format(target_id, ".", name), value) >> 313 else: >> 314 raise Py4JError( >> >> Py4JJavaError: An error occurred while calling o408.transform. >> : java.lang.OutOfMemoryError: GC overhead limit exceeded >> at scala.collection.immutable.Stream$.from(Stream.scala:1262) >> at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) >> at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) >> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) >> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) >> at >> scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274) >> at >> scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277) >> at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202) >> at scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133) >> at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203) >> at scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66) >> at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197) >> at scala.collection.SeqLike$class.size(SeqLike.scala:106) >> at >> scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37) >> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285) >> at >> scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37) >> at >> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) >> at >> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) >> at scala.Option.map(Option.scala:146) >> at >> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70) >> at >> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65) >> at >> org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234) >> at >> org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246) >> at >> org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) >> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at py4j.Gateway.invoke(Gateway.java:280) >> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) >> >> >> >> >> Spark Properties >> NameValue >> spark.app.namepyspark-shell >> spark.driver.cores1 >> spark.driver.extraJavaOptions-XX:+UnlockDiagnosticVMOptions >> -XX:+PerfDisableSharedMem >> spark.driver.memory2g >> spark.dynamicAllocation.enabledFALSE >> spark.eventLog.dirhdfs:///spark/history >> spark.eventLog.enabledTRUE >> spark.executor.cores1 >> spark.executor.extraJavaOptions-XX:+UnlockDiagnosticVMOptions >> -XX:+PerfDisableSharedMem >> spark.executor.iddriver >> spark.executor.instances128 >> spark.executor.memory2g >> spark.history.fs.logDirectoryhdfs:///spark/history >> spark.masteryarn-client >> spark.memory.fraction0.7 >> spark.memory.storageFraction0.5 >> spark.rdd.compressTRUE >> spark.scheduler.modeFIFO >> spark.serializer.objectStreamReset100 >> spark.shuffle.service.enabledFALSE >> spark.speculationTRUE >> spark.submit.deployModeclient >> spark.task.maxFailures10 >> spark.yarn.executor.memoryOverhead2048 >> spark.yarn.isPythonTRUE >> >> >> On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com> >> wrote: >> >> Ok, interesting. Would be interested to see how it compares. >> >> By the way, the feature size you select for the hasher should be a power of >> 2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes >> are evenly distributed (see the section on HashingTF under >> http://spark.apache.org/docs/latest/ml-features.html#tf-idf). >> >> On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote: >>> >>> Thanks Nick, I played around with the hashing trick. When I set numFeatures >>> to the amount of distinct values for the largest sparse feature, I ended up >>> with half of them colliding. When raising the numFeatures to have less >>> collisions, I soon ended up with the same memory problems as before. To be >>> honest, I didn’t test the impact of having more or less collisions on the >>> quality of the predictions, but tunnel visioned into getting it to work >>> with the full sparsity. >>> >>> Before I worked in RDD land; zipWithIndex on rdd with distinct values + one >>> entry ‘missing’ for missing values during predict, collectAsMap, broadcast >>> the map, udf generating sparse vector, assembling the vectors manually). To >>> move into dataframe land, I wrote: >>> >>> def getMappings(mode): >>> mappings = defaultdict(dict) >>> max_index = 0 >>> for c in cat_int[:]: # for every categorical variable >>> >>> logging.info("starting with {}".format(c)) >>> if mode == 'train': >>> grouped = (df2 >>> .groupBy(c).count().orderBy('count', ascending = False) # >>> get counts, ordered from largest to smallest >>> .selectExpr("*", "1 as n") # prepare for window >>> function summing up 1s before current row to create a RANK >>> .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS >>> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS >>> index".format(max_index)) >>> .drop('n') # drop the column with static 1 values used for >>> the cumulative sum >>> ) >>> logging.info("Got {} rows.".format(grouped.count())) >>> grouped.show() >>> logging.info('getting max') >>> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda >>> r: r.t).first() # update the max index so next categorical feature starts >>> with it. >>> logging.info("max_index has become: {}".format(max_index)) >>> logging.info('adding missing value, so we also train on this >>> and prediction data missing it. ') >>> schema = grouped.schema >>> logging.info(schema) >>> grouped = grouped.union(spark.createDataFrame([('missing', 0, >>> max_index + 1)], schema)) # add index for missing value for values during >>> predict that are unseen during training. >>> max_index += 1 >>> saveto = "{}/{}".format(path, c) >>> logging.info("Writing to: {}".format(saveto)) >>> grouped.write.parquet(saveto, mode = 'overwrite') >>> >>> elif mode == 'predict': >>> loadfrom = "{}/{}".format(path, c) >>> logging.info("Reading from: {}".format(loadfrom)) >>> grouped = spark.read.parquet(loadfrom) >>> >>> logging.info("Adding to dictionary") >>> mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: >>> (d[c], d['index'])).collectAsMap() # build up dictionary to be broadcasted >>> later on, used for creating sparse vectors >>> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: >>> r.t).first() >>> >>> logging.info("Sanity check for indexes:") >>> for c in cat_int[:]: >>> logging.info("{} min: {} max: {}".format(c, >>> min(mappings[c].values()), max(mappings[c].values()))) # some logging to >>> confirm the indexes. >>> logging.info("Missing value = {}".format(mappings[c]['missing'])) >>> return max_index, mappings >>> >>> I’d love to see the StringIndexer + OneHotEncoder transformers cope with >>> missing values during prediction; for now I’ll work with the hacked stuff >>> above :). >>> (.. and I should compare the performance with using the hashing trick.) >>> >>> Ben >> >>
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org