Thanks Nick, I played around with the hashing trick. When I set numFeatures to the amount of distinct values for the largest sparse feature, I ended up with half of them colliding. When raising the numFeatures to have less collisions, I soon ended up with the same memory problems as before. To be honest, I didn’t test the impact of having more or less collisions on the quality of the predictions, but tunnel visioned into getting it to work with the full sparsity.
Before I worked in RDD land; zipWithIndex on rdd with distinct values + one entry ‘missing’ for missing values during predict, collectAsMap, broadcast the map, udf generating sparse vector, assembling the vectors manually). To move into dataframe land, I wrote: def getMappings(mode): mappings = defaultdict(dict) max_index = 0 for c in cat_int[:]: # for every categorical variable logging.info("starting with {}".format(c)) if mode == 'train': grouped = (df2 .groupBy(c).count().orderBy('count', ascending = False) # get counts, ordered from largest to smallest .selectExpr("*", "1 as n") # prepare for window function summing up 1s before current row to create a RANK .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS index".format(max_index)) .drop('n') # drop the column with static 1 values used for the cumulative sum ) logging.info("Got {} rows.".format(grouped.count())) grouped.show() logging.info('getting max') max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: r.t).first() # update the max index so next categorical feature starts with it. logging.info("max_index has become: {}".format(max_index)) logging.info('adding missing value, so we also train on this and prediction data missing it. ') schema = grouped.schema logging.info(schema) grouped = grouped.union(spark.createDataFrame([('missing', 0, max_index + 1)], schema)) # add index for missing value for values during predict that are unseen during training. max_index += 1 saveto = "{}/{}".format(path, c) logging.info("Writing to: {}".format(saveto)) grouped.write.parquet(saveto, mode = 'overwrite') elif mode == 'predict': loadfrom = "{}/{}".format(path, c) logging.info("Reading from: {}".format(loadfrom)) grouped = spark.read.parquet(loadfrom) logging.info("Adding to dictionary") mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: (d[c], d['index'])).collectAsMap() # build up dictionary to be broadcasted later on, used for creating sparse vectors max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: r.t).first() logging.info("Sanity check for indexes:") for c in cat_int[:]: logging.info("{} min: {} max: {}".format(c, min(mappings[c].values()), max(mappings[c].values()))) # some logging to confirm the indexes. logging.info("Missing value = {}".format(mappings[c]['missing'])) return max_index, mappings I’d love to see the StringIndexer + OneHotEncoder transformers cope with missing values during prediction; for now I’ll work with the hacked stuff above :). (.. and I should compare the performance with using the hashing trick.) Ben > On Aug 4, 2016, at 3:44 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > > Sure, I understand there are some issues with handling this missing value > situation in StringIndexer currently. Your workaround is not ideal but I see > that it is probably the only mechanism available currently to avoid the > problem. > > But the OOM issues seem to be more about the feature cardinality (so the size > of the hashmap to store the feature <-> index mappings). > > A nice property of feature hashing is that it implicitly handles unseen > category labels by setting the coefficient value to 0 (in the absence of a > hash collision) - basically option 2 from H2O. > > Why is that? Well once you've trained your model you have a (sparse) > N-dimensional weight vector that will be definition have 0s for unseen > indexes. At test time, any feature that only appears in your test set or new > data will be hashed to an index in the weight vector that has value 0. > > So it could be useful for both of your problems. > > On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen <bteeu...@gmail.com > <mailto:bteeu...@gmail.com>> wrote: > Hi Nick, > > Thanks for the suggestion. Reducing the dimensionality is an option, thanks, > but let’s say I really want to do this :). > > The reason why it’s so big is that I’m unifying my training and test data, > and I don’t want to drop rows in the test data just because one of the > features was missing in the training data. I wouldn’t need this workaround, > if I had a better strategy in Spark for dealing with missing levels. How > Spark can deal with it: > > "Additionally, there are two strategies regarding how StringIndexer will > handle unseen labels when you have fit aStringIndexer on one dataset and then > use it to transform another: > • throw an exception (which is the default) > • skip the row containing the unseen label entirely" > http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer > <http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer> > > I like how H2O handles this; > > "What happens during prediction if the new sample has categorical levels not > seen in training? The value will be filled with either special missing level > (if trained with missing values and missing_value_handling was set to > MeanImputation) or 0.” > https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md > > <https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md> > > So assuming I need to unify the data, make it huge, and trying out more in > scala, I see these kinds of errors: > _____________ > > scala> feedBack(s"Applying string indexers: fitting") > 2016-08-04 10:13:20() | Applying string indexers: fitting > > scala> val pipelined = new Pipeline().setStages(stagesIndex.toArray) > pipelined: org.apache.spark.ml.Pipeline = pipeline_83be3b554e3a > > scala> val dfFitted = pipelined.fit(df) > dfFitted: org.apache.spark.ml.PipelineModel = pipeline_83be3b554e3a > > scala> feedBack(s"Applying string indexers: transforming") > 2016-08-04 10:17:29() | Applying string indexers: transforming > > scala> var df2 = dfFitted.transform(df) > df2: org.apache.spark.sql.DataFrame = [myid: string, feature1: int ... 16 > more fields] > > scala> > > scala> feedBack(s"Applying OHE: fitting") > 2016-08-04 10:18:07() | Applying OHE: fitting > > scala> val pipelined2 = new Pipeline().setStages(stagesOhe.toArray) > pipelined2: org.apache.spark.ml.Pipeline = pipeline_ba7922a29322 > > scala> val dfFitted2 = pipelined2.fit(df2) > 16/08/04 10:21:41 WARN DFSClient: Slow ReadProcessor read fields took 85735ms > (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR > downstreamAckTimeNanos: 0, targets: [10.10.66.13:50010 > <http://10.10.66.13:50010/>, 10.10.95.11:50010 <http://10.10.95.11:50010/>, > 10.10.95.29:50010 <http://10.10.95.29:50010/>] > 16/08/04 10:21:41 WARN DFSClient: DFSOutputStream ResponseProcessor exception > for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 > java.io.IOException: Bad response ERROR for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 from > datanode 10.10.95.11:50010 <http://10.10.95.11:50010/> > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897) > 16/08/04 10:21:41 WARN DFSClient: Error Recovery for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 in > pipeline 10.10.66.13:50010 <http://10.10.66.13:50010/>, 10.10.95.11:50010 > <http://10.10.95.11:50010/>, 10.10.95.29:50010 <http://10.10.95.29:50010/>: > bad datanode 10.10.95.11:50010 <http://10.10.95.11:50010/> > dfFitted2: org.apache.spark.ml.PipelineModel = pipeline_ba7922a29322 > > scala> feedBack(s"Applying OHE: transforming") > 2016-08-04 10:29:12() | Applying OHE: transforming > > scala> df2 = dfFitted2.transform(df2).cache() > 16/08/04 10:34:18 WARN DFSClient: DFSOutputStream ResponseProcessor exception > for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 > java.io.EOFException: Premature EOF: no length prefix available > at > org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203) > at > org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867) > 16/08/04 10:34:18 WARN DFSClient: Error Recovery for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 in > pipeline 10.10.66.13:50010 <http://10.10.66.13:50010/>, 10.10.66.3:50010 > <http://10.10.66.3:50010/>, 10.10.95.29:50010 <http://10.10.95.29:50010/>: > bad datanode 10.10.66.13:50010 <http://10.10.66.13:50010/> > 16/08/04 10:36:03 WARN DFSClient: Slow ReadProcessor read fields took 74146ms > (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: SUCCESS status: > ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.3:50010 > <http://10.10.66.3:50010/>, 10.10.66.1:50010 <http://10.10.66.1:50010/>, > 10.10.95.29:50010 <http://10.10.95.29:50010/>] > 16/08/04 10:36:03 WARN DFSClient: DFSOutputStream ResponseProcessor exception > for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 > java.io.IOException: Bad response ERROR for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 from > datanode 10.10.95.29:50010 <http://10.10.95.29:50010/> > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897) > 16/08/04 10:36:03 WARN DFSClient: Error Recovery for block > BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 in > pipeline 10.10.66.3:50010 <http://10.10.66.3:50010/>, 10.10.66.1:50010 > <http://10.10.66.1:50010/>, 10.10.95.29:50010 <http://10.10.95.29:50010/>: > bad datanode 10.10.95.29:50010 <http://10.10.95.29:50010/> > 16/08/04 10:40:48 WARN DFSClient: Slow ReadProcessor read fields took 60891ms > (threshold=30000ms); ack: seqno: -2 status: > > ____________________ > > After 40 minutes or so, with no activity in the application master, it dies. > > Ben > >> On Aug 4, 2016, at 12:14 PM, Nick Pentreath <nick.pentre...@gmail.com >> <mailto:nick.pentre...@gmail.com>> wrote: >> >> Hi Ben >> >> Perhaps with this size cardinality it is worth looking at feature hashing >> for your problem. Spark has the HashingTF transformer that works on a column >> of "sentences" (i.e. [string]). >> >> For categorical features you can hack it a little by converting your feature >> value into a ["feature_name=feature_value"] representation. Then HashingTF >> can be used as is. Note you can also just do ["feature_value"], but the >> former would allow you, with a bit of munging, to hash all your feature >> columns at the same time. >> >> The advantage is speed and bounded memory footprint. The disadvantages >> include (i) no way to reverse the mapping from feature_index -> >> feature_name; (ii) potential for hash collisions (can be helped a bit by >> increasing your feature vector size). >> >> Here is a minimal example: >> >> In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder, >> HashingTF >> In [2]: from pyspark.sql.types import StringType, ArrayType >> In [3]: from pyspark.sql.functions import udf >> >> In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"), (3, >> "baz")], ["id", "feature"]) >> >> In [5]: to_array = udf(lambda s: ["feature=%s" % s], ArrayType(StringType())) >> >> In [6]: df = df.withColumn("features", to_array("feature")) >> >> In [7]: df.show() >> +---+-------+-------------+ >> | id|feature| features| >> +---+-------+-------------+ >> | 0| foo|[feature=foo]| >> | 1| bar|[feature=bar]| >> | 2| foo|[feature=foo]| >> | 3| baz|[feature=baz]| >> +---+-------+-------------+ >> >> In [8]: indexer = StringIndexer(inputCol="feature", >> outputCol="feature_index") >> >> In [9]: indexed = indexer.fit(df).transform(df) >> >> In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index", >> outputCol="feature_vector") >> >> In [11]: encoded = encoder.transform(indexed) >> >> In [12]: encoded.show() >> +---+-------+-------------+-------------+--------------+ >> | id|feature| features|feature_index|feature_vector| >> +---+-------+-------------+-------------+--------------+ >> | 0| foo|[feature=foo]| 0.0| (3,[0],[1.0])| >> | 1| bar|[feature=bar]| 2.0| (3,[2],[1.0])| >> | 2| foo|[feature=foo]| 0.0| (3,[0],[1.0])| >> | 3| baz|[feature=baz]| 1.0| (3,[1],[1.0])| >> +---+-------+-------------+-------------+--------------+ >> >> In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features", >> outputCol="features_vector") >> >> In [23]: hashed = hasher.transform(df) >> >> In [24]: hashed.show() >> +---+-------+-------------+-----------------+ >> | id|feature| features| features_vector| >> +---+-------+-------------+-----------------+ >> | 0| foo|[feature=foo]| (256,[59],[1.0])| >> | 1| bar|[feature=bar]|(256,[219],[1.0])| >> | 2| foo|[feature=foo]| (256,[59],[1.0])| >> | 3| baz|[feature=baz]| (256,[38],[1.0])| >> +---+-------+-------------+-----------------+ >> >> On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen <bteeu...@gmail.com >> <mailto:bteeu...@gmail.com>> wrote: >> I raised driver memory to 30G and maxresultsize to 25G, this time in >> pyspark. >> >> Code run: >> >> cat_int = ['bigfeature'] >> >> stagesIndex = [] >> stagesOhe = [] >> for c in cat_int: >> stagesIndex.append(StringIndexer(inputCol=c, >> outputCol="{}Index".format(c))) >> stagesOhe.append(OneHotEncoder(dropLast= False, inputCol = >> "{}Index".format(c), outputCol = "{}OHE".format(c))) >> >> df2 = df >> >> for i in range(len(stagesIndex)): >> logging.info <http://logging.info/>("Starting with {}".format(cat_int[i])) >> stagesIndex[i].fit(df2) >> logging.info <http://logging.info/>("Fitted. Now transforming:") >> df2 = stagesIndex[i].fit(df2).transform(df2) >> logging.info <http://logging.info/>("Transformed. Now showing >> transformed:") >> df2.show() >> logging.info <http://logging.info/>("OHE") >> df2 = stagesOhe[i].transform(df2) >> logging.info <http://logging.info/>("Fitted. Now showing OHE:") >> df2.show() >> >> Now I get error: >> >> 2016-08-04 08:53:44,839 INFO Starting with bigfeature >> [57/7074] >> ukStringIndexer_442b8e11e3294de9b83a >> 2016-08-04 09:06:18,147 INFO Fitted. Now transforming: >> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 - >> Cannot receive any reply in 120 seconds. This timeout is controlled by >> spark.rpc.askTimeout >> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 >> seconds. This timeout is controlled by spark.rpc.askTimeout >> at org.apache.spark.rpc.RpcTimeout.org >> <http://org.apache.spark.rpc.rpctimeout.org/>$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) >> at >> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) >> at >> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) >> at >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >> at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) >> at scala.util.Try$.apply(Try.scala:192) >> at scala.util.Failure.recover(Try.scala:216) >> at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) >> at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >> at >> org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) >> at >> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136) >> at >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >> at scala.concurrent.Promise$class.complete(Promise.scala:55) >> at >> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) >> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) >> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >> at >> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63) >> at >> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78) >> at >> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) >> at >> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) >> at >> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >> at >> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54) >> at >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) >> at >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106) >> at >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) >> at >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >> at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) >> at org.apache.spark.rpc.netty.NettyRpcEnv.org >> <http://org.apache.spark.rpc.netty.nettyrpcenv.org/>$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205) >> at >> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> [13/7074] >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply >> in 120 seconds >> ... 8 more >> 16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for RPC >> 4858888672840406395 from /10.10.80.4:59931 <http://10.10.80.4:59931/> (47 >> bytes) since it is not outstanding >> ^[[A^[[5~2016-08-04 09:12:07,016 INFO Transformed. Now showing >> transformed: >> 16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took >> 71756ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR >> downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010 >> <http://10.10.66.5:50010/>, 10.10.10.12:50010 <http://10.10.10.12:50010/>, >> 10.10.91.9:50010 <http://10.10.91.9:50010/>] >> 16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor >> exception for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 >> java.io.IOException: Bad response ERROR for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from >> datanode 10.10.10.12:50010 <http://10.10.10.12:50010/> >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897) >> 16/08/04 09:13:48 WARN DFSClient: Error Recovery for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in >> pipeline 10.10.66.5:50010 <http://10.10.66.5:50010/>, 10.10.10.12:50010 >> <http://10.10.10.12:50010/>, 10.192.91.9:50010 <http://10.192.91.9:50010/>: >> bad datanode 10.10.10.12:50010 <http://10.10.10.12:50010/> >> Traceback (most recent call last): >> File "<stdin>", line 7, in <module> >> File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in show >> print(self._jdf.showString(n, truncate)) >> File >> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line >> 933, in __call__ >> File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco >> return f(*a, **kw) >> File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", >> line 312, in get_return_value >> py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString. >> : java.lang.OutOfMemoryError: Requested array size exceeds VM limit >> at java.util.Arrays.copyOf(Arrays.java:2271) >> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) >> at >> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >> at >> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >> at >> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) >> at >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) >> at >> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) >> at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) >> at >> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) >> at >> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) >> at >> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) >> at >> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) >> at >> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) >> at >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) >> >> Ben >> >>> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com >>> <mailto:bteeu...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I want to one hot encode a column containing 56 million distinct values. My >>> dataset is 800m rows + 17 columns. >>> I first apply a StringIndexer, but it already breaks there giving a OOM >>> java heap space error. >>> >>> I launch my app on YARN with: >>> /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors 128 >>> --executor-cores 2 --driver-memory 12G --conf spark.driver.maxResultSize=8G >>> >>> After grabbing the data, I run: >>> >>> val catInts = Array(“bigfeature”) >>> >>> val stagesIndex = scala.collection.mutable.ArrayBuffer.empty[StringIndexer] >>> val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder] >>> for (c <- catInts) { >>> println(s"starting with $c") >>> val i = new StringIndexer() >>> .setInputCol(c) >>> .setOutputCol(s"${c}Index") >>> stagesIndex += i >>> >>> val o = new OneHotEncoder() >>> .setDropLast(false) >>> .setInputCol(s"${c}Index") >>> .setOutputCol(s"${c}OHE") >>> stagesOhe += o >>> } >>> >>> println(s"Applying string indexers: fitting") >>> val pipelined = new Pipeline().setStages(stagesIndex.toArray) >>> val dfFitted = pipelined.fit(df) >>> >>> >>> Then, the application master shows a "countByValue at StringIndexer.scala” >>> taking 1.8 minutes (so very fast). >>> Afterwards, the shell console hangs for a while. What is it doing now? >>> After some time, it shows: >>> >>> scala> val dfFitted = pipelined.fit(df) >>> >>> java.lang.OutOfMemoryError: Java heap space >>> at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141) >>> at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139) >>> at >>> org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159) >>> at >>> org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230) >>> at >>> org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167) >>> at >>> org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86) >>> at >>> org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137) >>> at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93) >>> at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66) >>> at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149) >>> at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:893) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >>> at >>> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44) >>> at >>> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37) >>> at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145) >>> ... 16 elided >>> >>> >>> >> >