Thanks Nick, I played around with the hashing trick. When I set numFeatures to 
the amount of distinct values for the largest sparse feature, I ended up with 
half of them colliding. When raising the numFeatures to have less collisions, I 
soon ended up with the same memory problems as before. To be honest, I didn’t 
test the impact of having more or less collisions on the quality of the 
predictions, but tunnel visioned into getting it to work with the full sparsity.

Before I worked in RDD land; zipWithIndex on rdd with distinct values + one 
entry ‘missing’ for missing values during predict, collectAsMap, broadcast the 
map, udf generating sparse vector, assembling the vectors manually). To move 
into dataframe land, I wrote:

def getMappings(mode):
    mappings = defaultdict(dict)
    max_index = 0
    for c in cat_int[:]:    # for every categorical variable

        logging.info("starting with {}".format(c))
        if mode == 'train':
            grouped = (df2                  
                .groupBy(c).count().orderBy('count', ascending = False)  # get 
counts, ordered from largest to smallest 
                .selectExpr("*", "1 as n")      # prepare for window function 
summing up 1s before current row to create a RANK
                .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS BETWEEN 
UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS index".format(max_index))
                .drop('n') # drop the column with static 1 values used for the 
cumulative sum
                )
            logging.info("Got {} rows.".format(grouped.count()))
            grouped.show()
            logging.info('getting max')
            max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
r.t).first()  # update the max index so next categorical feature starts with it.
            logging.info("max_index has become: {}".format(max_index))
            logging.info('adding missing value, so we also train on this and 
prediction data missing it. ')
            schema = grouped.schema
            logging.info(schema)
            grouped = grouped.union(spark.createDataFrame([('missing', 0, 
max_index + 1)], schema))  # add index for missing value for values during 
predict that are unseen during training.
            max_index += 1
            saveto = "{}/{}".format(path, c)
            logging.info("Writing to: {}".format(saveto))
            grouped.write.parquet(saveto, mode = 'overwrite')

        elif mode == 'predict':
            loadfrom = "{}/{}".format(path, c)
            logging.info("Reading from: {}".format(loadfrom))
            grouped = spark.read.parquet(loadfrom)

        logging.info("Adding to dictionary")
        mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: 
(d[c], d['index'])).collectAsMap()  # build up dictionary to be broadcasted 
later on, used for creating sparse vectors
        max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
r.t).first()

    logging.info("Sanity check for indexes:")
    for c in cat_int[:]:
        logging.info("{} min: {} max: {}".format(c, min(mappings[c].values()), 
max(mappings[c].values())))   # some logging to confirm the indexes.
        logging.info("Missing value = {}".format(mappings[c]['missing']))
    return max_index, mappings

I’d love to see the StringIndexer + OneHotEncoder transformers cope with 
missing values during prediction; for now I’ll work with the hacked stuff above 
:).
(.. and I should compare the performance with using the hashing trick.)

Ben

> On Aug 4, 2016, at 3:44 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote:
> 
> Sure, I understand there are some issues with handling this missing value 
> situation in StringIndexer currently. Your workaround is not ideal but I see 
> that it is probably the only mechanism available currently to avoid the 
> problem.
> 
> But the OOM issues seem to be more about the feature cardinality (so the size 
> of the hashmap to store the feature <-> index mappings).
> 
> A nice property of feature hashing is that it implicitly handles unseen 
> category labels by setting the coefficient value to 0 (in the absence of a 
> hash collision) - basically option 2 from H2O.
> 
> Why is that? Well once you've trained your model you have a (sparse) 
> N-dimensional weight vector that will be definition have 0s for unseen 
> indexes. At test time, any feature that only appears in your test set or new 
> data will be hashed to an index in the weight vector that has value 0.
> 
> So it could be useful for both of your problems.
> 
> On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen <bteeu...@gmail.com 
> <mailto:bteeu...@gmail.com>> wrote:
> Hi Nick, 
> 
> Thanks for the suggestion. Reducing the dimensionality is an option, thanks, 
> but let’s say I really want to do this :).
> 
> The reason why it’s so big is that I’m unifying my training and test data, 
> and I don’t want to drop rows in the test data just because one of the 
> features was missing in the training data. I wouldn’t need this  workaround, 
> if I had a better strategy in Spark for dealing with missing levels. How 
> Spark can deal with it:
> 
> "Additionally, there are two strategies regarding how StringIndexer will 
> handle unseen labels when you have fit aStringIndexer on one dataset and then 
> use it to transform another:
>       • throw an exception (which is the default)
>       • skip the row containing the unseen label entirely"
> http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer 
> <http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer> 
> 
> I like how H2O handles this; 
> 
> "What happens during prediction if the new sample has categorical levels not 
> seen in training? The value will be filled with either special missing level 
> (if trained with missing values and missing_value_handling was set to 
> MeanImputation) or 0.”
> https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md
>  
> <https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md>
> 
> So assuming I need to unify the data, make it huge, and trying out more in 
> scala, I see these kinds of errors:
> _____________
> 
> scala> feedBack(s"Applying string indexers: fitting")
> 2016-08-04 10:13:20() | Applying string indexers: fitting
> 
> scala> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
> pipelined: org.apache.spark.ml.Pipeline = pipeline_83be3b554e3a
> 
> scala> val dfFitted = pipelined.fit(df)
> dfFitted: org.apache.spark.ml.PipelineModel = pipeline_83be3b554e3a
> 
> scala> feedBack(s"Applying string indexers: transforming")
> 2016-08-04 10:17:29() | Applying string indexers: transforming
> 
> scala> var df2 = dfFitted.transform(df)
> df2: org.apache.spark.sql.DataFrame = [myid: string, feature1: int ... 16 
> more fields]
> 
> scala>
> 
> scala> feedBack(s"Applying OHE: fitting")
> 2016-08-04 10:18:07() | Applying OHE: fitting
> 
> scala> val pipelined2 = new Pipeline().setStages(stagesOhe.toArray)
> pipelined2: org.apache.spark.ml.Pipeline = pipeline_ba7922a29322
> 
> scala> val dfFitted2 = pipelined2.fit(df2)
> 16/08/04 10:21:41 WARN DFSClient: Slow ReadProcessor read fields took 85735ms 
> (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR 
> downstreamAckTimeNanos: 0, targets: [10.10.66.13:50010 
> <http://10.10.66.13:50010/>, 10.10.95.11:50010 <http://10.10.95.11:50010/>, 
> 10.10.95.29:50010 <http://10.10.95.29:50010/>]
> 16/08/04 10:21:41 WARN DFSClient: DFSOutputStream ResponseProcessor exception 
>  for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377
> java.io.IOException: Bad response ERROR for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 from 
> datanode 10.10.95.11:50010 <http://10.10.95.11:50010/>
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
> 16/08/04 10:21:41 WARN DFSClient: Error Recovery for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 in 
> pipeline 10.10.66.13:50010 <http://10.10.66.13:50010/>, 10.10.95.11:50010 
> <http://10.10.95.11:50010/>, 10.10.95.29:50010 <http://10.10.95.29:50010/>: 
> bad datanode 10.10.95.11:50010 <http://10.10.95.11:50010/>
> dfFitted2: org.apache.spark.ml.PipelineModel = pipeline_ba7922a29322
> 
> scala> feedBack(s"Applying OHE: transforming")
> 2016-08-04 10:29:12() | Applying OHE: transforming
> 
> scala> df2 = dfFitted2.transform(df2).cache()
> 16/08/04 10:34:18 WARN DFSClient: DFSOutputStream ResponseProcessor exception 
>  for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608
> java.io.EOFException: Premature EOF: no length prefix available
>         at 
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203)
>         at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 16/08/04 10:34:18 WARN DFSClient: Error Recovery for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 in 
> pipeline 10.10.66.13:50010 <http://10.10.66.13:50010/>, 10.10.66.3:50010 
> <http://10.10.66.3:50010/>, 10.10.95.29:50010 <http://10.10.95.29:50010/>: 
> bad datanode 10.10.66.13:50010 <http://10.10.66.13:50010/>
> 16/08/04 10:36:03 WARN DFSClient: Slow ReadProcessor read fields took 74146ms 
> (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: SUCCESS status: 
> ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.3:50010 
> <http://10.10.66.3:50010/>, 10.10.66.1:50010 <http://10.10.66.1:50010/>, 
> 10.10.95.29:50010 <http://10.10.95.29:50010/>]
> 16/08/04 10:36:03 WARN DFSClient: DFSOutputStream ResponseProcessor exception 
>  for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488
> java.io.IOException: Bad response ERROR for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 from 
> datanode 10.10.95.29:50010 <http://10.10.95.29:50010/>
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
> 16/08/04 10:36:03 WARN DFSClient: Error Recovery for block 
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 in 
> pipeline 10.10.66.3:50010 <http://10.10.66.3:50010/>, 10.10.66.1:50010 
> <http://10.10.66.1:50010/>, 10.10.95.29:50010 <http://10.10.95.29:50010/>: 
> bad datanode 10.10.95.29:50010 <http://10.10.95.29:50010/>
> 16/08/04 10:40:48 WARN DFSClient: Slow ReadProcessor read fields took 60891ms 
> (threshold=30000ms); ack: seqno: -2 status:
> 
> ____________________
> 
> After 40 minutes or so, with no activity in the application master, it dies.
> 
> Ben
> 
>> On Aug 4, 2016, at 12:14 PM, Nick Pentreath <nick.pentre...@gmail.com 
>> <mailto:nick.pentre...@gmail.com>> wrote:
>> 
>> Hi Ben
>> 
>> Perhaps with this size cardinality it is worth looking at feature hashing 
>> for your problem. Spark has the HashingTF transformer that works on a column 
>> of "sentences" (i.e. [string]).
>> 
>> For categorical features you can hack it a little by converting your feature 
>> value into a ["feature_name=feature_value"] representation. Then HashingTF 
>> can be used as is. Note you can also just do ["feature_value"], but the 
>> former would allow you, with a bit of munging, to hash all your feature 
>> columns at the same time.
>> 
>> The advantage is speed and bounded memory footprint. The disadvantages 
>> include (i) no way to reverse the mapping from feature_index -> 
>> feature_name; (ii) potential for hash collisions (can be helped a bit by 
>> increasing your feature vector size).
>> 
>> Here is a minimal example:
>> 
>> In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder, 
>> HashingTF
>> In [2]: from pyspark.sql.types import StringType, ArrayType
>> In [3]: from pyspark.sql.functions import udf
>> 
>> In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"), (3, 
>> "baz")], ["id", "feature"])
>> 
>> In [5]: to_array = udf(lambda s: ["feature=%s" % s], ArrayType(StringType()))
>> 
>> In [6]: df = df.withColumn("features", to_array("feature"))
>> 
>> In [7]: df.show()
>> +---+-------+-------------+
>> | id|feature|     features|
>> +---+-------+-------------+
>> |  0|    foo|[feature=foo]|
>> |  1|    bar|[feature=bar]|
>> |  2|    foo|[feature=foo]|
>> |  3|    baz|[feature=baz]|
>> +---+-------+-------------+
>> 
>> In [8]: indexer = StringIndexer(inputCol="feature", 
>> outputCol="feature_index")
>> 
>> In [9]: indexed = indexer.fit(df).transform(df)
>> 
>> In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index", 
>> outputCol="feature_vector")
>> 
>> In [11]: encoded = encoder.transform(indexed)
>> 
>> In [12]: encoded.show()
>> +---+-------+-------------+-------------+--------------+
>> | id|feature|     features|feature_index|feature_vector|
>> +---+-------+-------------+-------------+--------------+
>> |  0|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
>> |  1|    bar|[feature=bar]|          2.0| (3,[2],[1.0])|
>> |  2|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
>> |  3|    baz|[feature=baz]|          1.0| (3,[1],[1.0])|
>> +---+-------+-------------+-------------+--------------+
>> 
>> In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features", 
>> outputCol="features_vector")
>> 
>> In [23]: hashed = hasher.transform(df)
>> 
>> In [24]: hashed.show()
>> +---+-------+-------------+-----------------+
>> | id|feature|     features|  features_vector|
>> +---+-------+-------------+-----------------+
>> |  0|    foo|[feature=foo]| (256,[59],[1.0])|
>> |  1|    bar|[feature=bar]|(256,[219],[1.0])|
>> |  2|    foo|[feature=foo]| (256,[59],[1.0])|
>> |  3|    baz|[feature=baz]| (256,[38],[1.0])|
>> +---+-------+-------------+-----------------+
>> 
>> On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen <bteeu...@gmail.com 
>> <mailto:bteeu...@gmail.com>> wrote:
>> I raised driver memory to 30G and maxresultsize to 25G, this time in 
>> pyspark. 
>> 
>> Code run:
>> 
>> cat_int  = ['bigfeature']
>> 
>> stagesIndex = []
>> stagesOhe   = []
>> for c in cat_int:
>>   stagesIndex.append(StringIndexer(inputCol=c, 
>> outputCol="{}Index".format(c)))
>>   stagesOhe.append(OneHotEncoder(dropLast= False, inputCol = 
>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>> 
>> df2 = df
>> 
>> for i in range(len(stagesIndex)):
>>   logging.info <http://logging.info/>("Starting with {}".format(cat_int[i]))
>>   stagesIndex[i].fit(df2)
>>   logging.info <http://logging.info/>("Fitted. Now transforming:")
>>   df2 = stagesIndex[i].fit(df2).transform(df2)
>>   logging.info <http://logging.info/>("Transformed. Now showing 
>> transformed:")
>>   df2.show()
>>   logging.info <http://logging.info/>("OHE")
>>   df2 = stagesOhe[i].transform(df2)
>>   logging.info <http://logging.info/>("Fitted. Now showing OHE:")
>>   df2.show()
>> 
>> Now I get error:
>> 
>> 2016-08-04 08:53:44,839 INFO       Starting with bigfeature                  
>>  [57/7074]
>> ukStringIndexer_442b8e11e3294de9b83a
>> 2016-08-04 09:06:18,147 INFO       Fitted. Now transforming:
>> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 - 
>> Cannot receive any reply in 120 seconds. This timeout is controlled by 
>> spark.rpc.askTimeout
>> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 
>> seconds. This timeout is controlled by spark.rpc.askTimeout
>>         at org.apache.spark.rpc.RpcTimeout.org 
>> <http://org.apache.spark.rpc.rpctimeout.org/>$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>>         at 
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>>         at 
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>>         at 
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>         at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
>>         at scala.util.Try$.apply(Try.scala:192)
>>         at scala.util.Failure.recover(Try.scala:216)
>>         at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>>         at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>         at 
>> org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
>>         at 
>> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
>>         at 
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>         at 
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>         at scala.concurrent.Promise$class.complete(Promise.scala:55)
>>         at 
>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
>>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>         at 
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>>         at 
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>>         at 
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>         at 
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>         at 
>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>         at 
>> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>>         at 
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>         at 
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>>         at 
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>         at 
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>         at 
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>         at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
>>         at 
>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
>>         at org.apache.spark.rpc.netty.NettyRpcEnv.org 
>> <http://org.apache.spark.rpc.netty.nettyrpcenv.org/>$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
>>         at 
>> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
>>         at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>                                                                              
>>      [13/7074]
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply 
>> in 120 seconds
>>         ... 8 more
>> 16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for RPC 
>> 4858888672840406395 from /10.10.80.4:59931 <http://10.10.80.4:59931/> (47 
>> bytes) since it is not outstanding
>> ^[[A^[[5~2016-08-04 09:12:07,016 INFO       Transformed. Now showing 
>> transformed:
>> 16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took 
>> 71756ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR 
>> downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010 
>> <http://10.10.66.5:50010/>, 10.10.10.12:50010 <http://10.10.10.12:50010/>, 
>> 10.10.91.9:50010 <http://10.10.91.9:50010/>]
>> 16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor 
>> exception  for block 
>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265
>> java.io.IOException: Bad response ERROR for block 
>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from 
>> datanode 10.10.10.12:50010 <http://10.10.10.12:50010/>
>>         at 
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
>> 16/08/04 09:13:48 WARN DFSClient: Error Recovery for block 
>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in 
>> pipeline 10.10.66.5:50010 <http://10.10.66.5:50010/>, 10.10.10.12:50010 
>> <http://10.10.10.12:50010/>, 10.192.91.9:50010 <http://10.192.91.9:50010/>: 
>> bad datanode 10.10.10.12:50010 <http://10.10.10.12:50010/>
>> Traceback (most recent call last):
>>   File "<stdin>", line 7, in <module>
>>   File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in show
>>     print(self._jdf.showString(n, truncate))
>>   File 
>> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 
>> 933, in __call__
>>   File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco
>>     return f(*a, **kw)
>>   File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", 
>> line 312, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString.
>> : java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>>         at java.util.Arrays.copyOf(Arrays.java:2271)
>>         at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>         at 
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>         at 
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>         at 
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>         at 
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>         at 
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>         at 
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>         at 
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>>         at 
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>>         at 
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>>         at 
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>         at 
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>>         at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
>>         at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
>>         at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>>         at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
>>         at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
>>         at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>>         at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>>         at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>>         at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at 
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>>         at 
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>>         at 
>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
>>         at 
>> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
>>         at 
>> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>>         at 
>> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
>>         at 
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>> 
>> Ben
>> 
>>> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com 
>>> <mailto:bteeu...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I want to one hot encode a column containing 56 million distinct values. My 
>>> dataset is 800m rows + 17 columns.
>>> I first apply a StringIndexer, but it already breaks there giving a OOM 
>>> java heap space error.
>>> 
>>> I launch my app on YARN with:
>>> /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors 128 
>>> --executor-cores 2 --driver-memory 12G --conf spark.driver.maxResultSize=8G
>>> 
>>> After grabbing the data, I run:
>>> 
>>> val catInts = Array(“bigfeature”)
>>> 
>>> val stagesIndex = scala.collection.mutable.ArrayBuffer.empty[StringIndexer]
>>> val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder]
>>> for (c <- catInts) {
>>>   println(s"starting with $c")
>>>   val i = new StringIndexer()
>>>     .setInputCol(c)
>>>     .setOutputCol(s"${c}Index")
>>>   stagesIndex += i
>>> 
>>>   val o = new OneHotEncoder()
>>>     .setDropLast(false)
>>>     .setInputCol(s"${c}Index")
>>>     .setOutputCol(s"${c}OHE")
>>>   stagesOhe += o
>>> }
>>> 
>>> println(s"Applying string indexers: fitting")
>>> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
>>> val dfFitted = pipelined.fit(df)
>>> 
>>> 
>>> Then, the application master shows a "countByValue at StringIndexer.scala” 
>>> taking 1.8 minutes (so very fast). 
>>> Afterwards, the shell console hangs for a while. What is it doing now? 
>>> After some time, it shows:
>>> 
>>> scala> val dfFitted = pipelined.fit(df)
>>>                                                                       
>>> java.lang.OutOfMemoryError: Java heap space
>>>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
>>>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86)
>>>   at 
>>> org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137)
>>>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93)
>>>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66)
>>>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
>>>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)
>>>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>>   at 
>>> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
>>>   at 
>>> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
>>>   at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)
>>>   ... 16 elided
>>> 
>>> 
>>> 
>> 
> 

Reply via email to