withColumn on nested schema
Hi, I'm trying to replace values in a nested column in a JSON-based dataframe using withColumn(). This syntax works for select, filter, etc, giving only the nested "country" column: df.select('body.payload.country') but if I do this, it will create a new column with the name "body.payload.country" df.withColumn('body.payload.country', lit(None)) Also tried with 'body["payload"]["country"]', etc. but no luck. Is it possible to do this somehow? Best, Zsolt
Map and MapParitions with partition-local variable
Any comment on this one? 2016. nov. 16. du. 12:59 ezt írta ("Zsolt Tóth" <toth.zsolt@gmail.com>): > Hi, > > I need to run a map() and a mapPartitions() on my input DF. As a > side-effect of the map(), a partition-local variable should be updated, > that is used in the mapPartitions() afterwards. > I can't use Broadcast variable, because it's shared between partitions on > the same executor. > > Where can I define this variable? > I could run a single mapPartitions() that defines the variable, iterates > over the input (just as the map() would do), collect the result into an > ArrayList, and then use the list's iterator (and the updated > partition-local variable) as the input of the transformation that the > original mapPartitions() did. > > It feels however, that this is not as optimal as running > map()+mapPartitions() because I need to store the ArrayList (which is > basically the whole data in the partition) in memory. > > Thanks, > Zsolt >
Map and MapParitions with partition-local variable
Hi, I need to run a map() and a mapPartitions() on my input DF. As a side-effect of the map(), a partition-local variable should be updated, that is used in the mapPartitions() afterwards. I can't use Broadcast variable, because it's shared between partitions on the same executor. Where can I define this variable? I could run a single mapPartitions() that defines the variable, iterates over the input (just as the map() would do), collect the result into an ArrayList, and then use the list's iterator (and the updated partition-local variable) as the input of the transformation that the original mapPartitions() did. It feels however, that this is not as optimal as running map()+mapPartitions() because I need to store the ArrayList (which is basically the whole data in the partition) in memory. Thanks, Zsolt
Re: Delegation Token renewal in yarn-cluster
I checked the logs of my tests, and found that the Spark schedules the token refresh based on the renew-interval property, not the max-lifetime. The settings in my tests: dfs.namenode.delegation.key.update-interval=52 dfs.namenode.delegation.token.max-lifetime=102 dfs.namenode.delegation.token.renew-interval=52 During the job submission, spark.yarn.token.renewal.interval is set: 2016-11-04 09:12:25 INFO Client:59 - Renewal Interval set to 520036 Then, it takes ~0.75*spark.yarn.token.renewal.interval to schedule the token refresh. 2016-11-04 09:12:37 INFO ExecutorDelegationTokenUpdater:59 - Scheduling token refresh from HDFS in 404251 millis. ... 2016-11-04 09:19:21 INFO ExecutorDelegationTokenUpdater:59 - Reading new delegation tokens from ... ... 2016-11-04 09:19:21 INFO ExecutorDelegationTokenUpdater:59 - Scheduling token refresh from HDFS in 390064 millis. ... 2016-11-04 09:25:52 INFO ExecutorDelegationTokenUpdater:59 - Reading new delegation tokens from ... ... 2016-11-04 09:25:52 INFO ExecutorDelegationTokenUpdater:59 - Scheduling token refresh from HDFS in 390022 millis. This was what confused me in the first place. Why does Spark ask for new tokens based on the renew-interval instead of the max-lifetime? 2016-11-04 2:37 GMT+01:00 Marcelo Vanzin <van...@cloudera.com>: > On Thu, Nov 3, 2016 at 3:47 PM, Zsolt Tóth <toth.zsolt@gmail.com> > wrote: > > What is the purpose of the delegation token renewal (the one that is done > > automatically by Hadoop libraries, after 1 day by default)? It seems > that it > > always happens (every day) until the token expires, no matter what. I'd > > probably find an answer to that in a basic Hadoop security description. > > I'm not sure and I never really got a good answer to that (I had the > same question in the past). My best guess is to limit how long an > attacker can do bad things if he gets hold of a delegation token. But > IMO if an attacker gets a delegation token, that's pretty bad > regardless of how long he can use it... > > > I have a feeling that giving the keytab to Spark bypasses the concept > behind > > delegation tokens. As I understand, the NN basically says that "your > > application can access hdfs with this delegation token, but only for 7 > > days". > > I'm not sure why there's a 7 day limit either, but let's assume > there's a good reason. Basically the app, at that point, needs to > prove to the NN it has a valid kerberos credential. Whether that's > from someone typing their password into a terminal, or code using a > keytab, it doesn't really matter. If someone was worried about that > user being malicious they'd disable the user's login in the KDC. > > This feature is needed because there are apps that need to keep > running, unattended, for longer than HDFS's max lifetime setting. > > -- > Marcelo >
Re: Delegation Token renewal in yarn-cluster
Thank you for the clarification Marcelo, makes sense. I'm thinking about 2 questions here, somewhat unrelated to the original problem. What is the purpose of the delegation token renewal (the one that is done automatically by Hadoop libraries, after 1 day by default)? It seems that it always happens (every day) until the token expires, no matter what. I'd probably find an answer to that in a basic Hadoop security description. I have a feeling that giving the keytab to Spark bypasses the concept behind delegation tokens. As I understand, the NN basically says that "your application can access hdfs with this delegation token, but only for 7 days". After 7 days, the NN should *ideally* ask me like "this app runs for a week now, do you want to continue that?" - then I'd need to login with my keytab and give the new delegation token to the application. I know that this would be really difficult to handle, but now Spark just "ignores" the whole token expiration mechanism and relogins every time it is needed. Am I missing something? 2016-11-03 22:42 GMT+01:00 Marcelo Vanzin <van...@cloudera.com>: > I think you're a little confused about what "renewal" means here, and > this might be the fault of the documentation (I haven't read it in a > while). > > The existing delegation tokens will always be "renewed", in the sense > that Spark (actually Hadoop code invisible to Spark) will talk to the > NN to extend its lifetime. The feature you're talking about is for > creating *new* delegation tokens after the old ones expire and cannot > be renewed anymore (i.e. the max-lifetime configuration). > > On Thu, Nov 3, 2016 at 2:02 PM, Zsolt Tóth <toth.zsolt@gmail.com> > wrote: > > Yes, I did change dfs.namenode.delegation.key.update-interval and > > dfs.namenode.delegation.token.renew-interval to 15 min, the > max-lifetime to > > 30min. In this case the application (without Spark having the keytab) did > > not fail after 15 min, only after 30 min. Is it possible that the > resource > > manager somehow automatically renews the delegation tokens for my > > application? > > > > 2016-11-03 21:34 GMT+01:00 Marcelo Vanzin <van...@cloudera.com>: > >> > >> Sounds like your test was set up incorrectly. The default TTL for > >> tokens is 7 days. Did you change that in the HDFS config? > >> > >> The issue definitely exists and people definitely have run into it. So > >> if you're not hitting it, it's most definitely an issue with your test > >> configuration. > >> > >> On Thu, Nov 3, 2016 at 7:22 AM, Zsolt Tóth <toth.zsolt@gmail.com> > >> wrote: > >> > Hi, > >> > > >> > I ran some tests regarding Spark's Delegation Token renewal mechanism. > >> > As I > >> > see, the concept here is simple: if I give my keytab file and client > >> > principal to Spark, it starts a token renewal thread, and renews the > >> > namenode delegation tokens after some time. This works fine. > >> > > >> > Then I tried to run a long application (with HDFS operation in the > end) > >> > without providing the keytab/principal to Spark, and I expected it to > >> > fail > >> > after the token expires. It turned out that this is not the case, the > >> > application finishes successfully without a delegation token renewal > by > >> > Spark. > >> > > >> > My question is: how is that possible? Shouldn't a saveAsTextfile() > fail > >> > after the namenode delegation token expired? > >> > > >> > Regards, > >> > Zsolt > >> > >> > >> > >> -- > >> Marcelo > > > > > > > > -- > Marcelo >
Re: Delegation Token renewal in yarn-cluster
Yes, I did change dfs.namenode.delegation.key.update-interval and dfs.namenode.delegation.token.renew-interval to 15 min, the max-lifetime to 30min. In this case the application (without Spark having the keytab) did not fail after 15 min, only after 30 min. Is it possible that the resource manager somehow automatically renews the delegation tokens for my application? 2016-11-03 21:34 GMT+01:00 Marcelo Vanzin <van...@cloudera.com>: > Sounds like your test was set up incorrectly. The default TTL for > tokens is 7 days. Did you change that in the HDFS config? > > The issue definitely exists and people definitely have run into it. So > if you're not hitting it, it's most definitely an issue with your test > configuration. > > On Thu, Nov 3, 2016 at 7:22 AM, Zsolt Tóth <toth.zsolt@gmail.com> > wrote: > > Hi, > > > > I ran some tests regarding Spark's Delegation Token renewal mechanism. > As I > > see, the concept here is simple: if I give my keytab file and client > > principal to Spark, it starts a token renewal thread, and renews the > > namenode delegation tokens after some time. This works fine. > > > > Then I tried to run a long application (with HDFS operation in the end) > > without providing the keytab/principal to Spark, and I expected it to > fail > > after the token expires. It turned out that this is not the case, the > > application finishes successfully without a delegation token renewal by > > Spark. > > > > My question is: how is that possible? Shouldn't a saveAsTextfile() fail > > after the namenode delegation token expired? > > > > Regards, > > Zsolt > > > > -- > Marcelo >
Re: Delegation Token renewal in yarn-cluster
Any ideas about this one? Am I missing something here? 2016-11-03 15:22 GMT+01:00 Zsolt Tóth <toth.zsolt@gmail.com>: > Hi, > > I ran some tests regarding Spark's Delegation Token renewal mechanism. As > I see, the concept here is simple: if I give my keytab file and client > principal to Spark, it starts a token renewal thread, and renews the > namenode delegation tokens after some time. This works fine. > > Then I tried to run a long application (with HDFS operation in the end) > without providing the keytab/principal to Spark, and I expected it to fail > after the token expires. It turned out that this is not the case, the > application finishes successfully without a delegation token renewal by > Spark. > > My question is: how is that possible? Shouldn't a saveAsTextfile() fail > after the namenode delegation token expired? > > Regards, > Zsolt >
Delegation Token renewal in yarn-cluster
Hi, I ran some tests regarding Spark's Delegation Token renewal mechanism. As I see, the concept here is simple: if I give my keytab file and client principal to Spark, it starts a token renewal thread, and renews the namenode delegation tokens after some time. This works fine. Then I tried to run a long application (with HDFS operation in the end) without providing the keytab/principal to Spark, and I expected it to fail after the token expires. It turned out that this is not the case, the application finishes successfully without a delegation token renewal by Spark. My question is: how is that possible? Shouldn't a saveAsTextfile() fail after the namenode delegation token expired? Regards, Zsolt
Re: Inserting column to DataFrame
Sure. I ran the same job with fewer columns, the exception: java.lang.IllegalArgumentException: requirement failed: DataFrame must have the same schema as the relation to which is inserted. DataFrame schema: StructType(StructField(pixel0,ByteType,true), StructField(pixel1,ByteType,true), StructField(pixel10,ByteType,true), StructField(pixel100,ShortType,true), StructField(pixel101,ShortType,true), StructField(pixel102,ShortType,true), StructField(pixel103,ShortType,true), StructField(pixel105,ShortType,true), StructField(pixel106,ShortType,true), StructField(id,DoubleType,true), StructField(label,ByteType,true), StructField(predict,DoubleType,true)) Relation schema: StructType(StructField(pixel0,ByteType,true), StructField(pixel1,ByteType,true), StructField(pixel10,ByteType,true), StructField(pixel100,ShortType,true), StructField(pixel101,ShortType,true), StructField(pixel102,ShortType,true), StructField(pixel103,ShortType,true), StructField(pixel105,ShortType,true), StructField(pixel106,ShortType,true), StructField(id,DoubleType,true), StructField(label,ByteType,true), StructField(predict,DoubleType,true)) at scala.Predef$.require(Predef.scala:233) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:113) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) Regards, Zsolt 2016-02-12 13:11 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: > Can you pastebin the full error with all column types ? > > There should be a difference between some column(s). > > Cheers > > > On Feb 11, 2016, at 2:12 AM, Zsolt Tóth <toth.zsolt@gmail.com> > wrote: > > > > Hi, > > > > I'd like to append a column of a dataframe to another DF (using Spark > 1.5.2): > > > > DataFrame outputDF = unlabelledDF.withColumn("predicted_label", > predictedDF.col("predicted")); > > > > I get the following exception: > > > > java.lang.IllegalArgumentException: requirement failed: DataFrame must > have the same schema as the relation to which is inserted. > > DataFrame schema: > StructType(StructField(predicted_label,DoubleType,true), ... numerical (ByteType/ShortType) columns> > > Relation schema: > StructType(StructField(predicted_label,DoubleType,true), ... columns> > > > > The interesting part is that the two schemas in the exception are > exactly the same. > > The same code with other input data (with fewer, both numerical and > non-numerical column) succeeds. > > Any idea why this happens? > > >
Re: Inserting column to DataFrame
Hi, thanks for the answers. If joining the DataFrames is the solution, then why does the simple withColumn() succeed for some datasets and fail for others? 2016-02-11 11:53 GMT+01:00 Michał Zieliński <zielinski.mich...@gmail.com>: > I think a good idea would be to do a join: > > outputDF = unlabelledDF.join(predictedDF.select(“id”,”predicted”),”id”) > > On 11 February 2016 at 10:12, Zsolt Tóth <toth.zsolt@gmail.com> wrote: > >> Hi, >> >> I'd like to append a column of a dataframe to another DF (using Spark >> 1.5.2): >> >> DataFrame outputDF = unlabelledDF.withColumn("predicted_label", >> predictedDF.col("predicted")); >> >> I get the following exception: >> >> java.lang.IllegalArgumentException: requirement failed: DataFrame must >> have the same schema as the relation to which is inserted. >> DataFrame schema: >> StructType(StructField(predicted_label,DoubleType,true), ...> numerical (ByteType/ShortType) columns> >> Relation schema: StructType(StructField(predicted_label,DoubleType,true), >> ... >> >> The interesting part is that the two schemas in the exception are exactly >> the same. >> The same code with other input data (with fewer, both numerical and >> non-numerical column) succeeds. >> Any idea why this happens? >> >> >
Inserting column to DataFrame
Hi, I'd like to append a column of a dataframe to another DF (using Spark 1.5.2): DataFrame outputDF = unlabelledDF.withColumn("predicted_label", predictedDF.col("predicted")); I get the following exception: java.lang.IllegalArgumentException: requirement failed: DataFrame must have the same schema as the relation to which is inserted. DataFrame schema: StructType(StructField(predicted_label,DoubleType,true), ... Relation schema: StructType(StructField(predicted_label,DoubleType,true), ... The interesting part is that the two schemas in the exception are exactly the same. The same code with other input data (with fewer, both numerical and non-numerical column) succeeds. Any idea why this happens?
create DataFrame from RDD
Hi, I have a Spark job with many transformations (sequence of maps and mapPartitions) and only one action in the end (DataFrame.write()). The transformations return an RDD, so I need to create a DataFrame. To be able to use sqlContext.createDataFrame() I need to know the schema of the Row but for that, I need to trigger the transformations (the schema can be different based on the user arguments, I can't hardcode it.). This results in 2 actions which seems like a waste of resources. Is there a way to solve this with only one action? Regards, Zsolt
Re: Re: driver ClassNotFoundException when MySQL JDBC exceptions are thrown on executor
Hi, this is exactly the same as my issue, seems to be a bug in 1.5.x. (see my thread for details) 2015-11-19 11:20 GMT+01:00 Jeff Zhang: > Seems your jdbc url is not correct. Should be jdbc:mysql:// > 192.168.41.229:3306 > > On Thu, Nov 19, 2015 at 6:03 PM, wrote: > >> hi guy, >> >>I also found --driver-class-path and spark.driver.extraClassPath >> is not working when I'm accessing mysql driver in my spark APP. >> >>the attache is my config for my APP. >> >> >> here are my command and the logs of the failure i encountted. >> >> >> [root@h170 spark]# bin/spark-shell --master spark://h170:7077 >> --driver-class-path lib/mysql-connector-java-5.1.32-bin.jar --jars >> lib/mysql-connector-java-5.1.32-bin.jar >> >> log4j:WARN No appenders could be found for logger >> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). >> >> log4j:WARN Please initialize the log4j system properly. >> >> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for >> more info. >> >> Using Spark's repl log4j profile: >> org/apache/spark/log4j-defaults-repl.properties >> >> To adjust logging level use sc.setLogLevel("INFO") >> >> Welcome to >> >> __ >> >> / __/__ ___ _/ /__ >> >> _\ \/ _ \/ _ `/ __/ '_/ >> >>/___/ .__/\_,_/_/ /_/\_\ version 1.5.2 >> >> /_/ >> >> >> Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java >> 1.7.0_79) >> >> Type in expressions to have them evaluated. >> >> Type :help for more information. >> >> 15/11/19 17:51:33 WARN MetricsSystem: Using default name DAGScheduler for >> source because spark.app.id is not set. >> >> Spark context available as sc. >> >> 15/11/19 17:51:35 WARN Connection: BoneCP specified but not present in >> CLASSPATH (or one of dependencies) >> >> 15/11/19 17:51:35 WARN Connection: BoneCP specified but not present in >> CLASSPATH (or one of dependencies) >> >> 15/11/19 17:51:48 WARN ObjectStore: Version information not found in >> metastore. hive.metastore.schema.verification is not enabled so recording >> the schema version 1.2.0 >> >> 15/11/19 17:51:48 WARN ObjectStore: Failed to get database default, >> returning NoSuchObjectException >> >> 15/11/19 17:51:50 WARN NativeCodeLoader: Unable to load native-hadoop >> library for your platform... using builtin-java classes where applicable >> >> 15/11/19 17:51:50 WARN Connection: BoneCP specified but not present in >> CLASSPATH (or one of dependencies) >> >> 15/11/19 17:51:50 WARN Connection: BoneCP specified but not present in >> CLASSPATH (or one of dependencies) >> >> SQL context available as sqlContext. >> >> >> scala> val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> >> "mysql://192.168.41.229:3306", "dbtable" -> "sqoop.SQOOP_ROOT")).load() >> >> java.sql.SQLException: No suitable driver found for mysql:// >> 192.168.41.229:3306 >> >> at java.sql.DriverManager.getConnection(DriverManager.java:596) >> >> at java.sql.DriverManager.getConnection(DriverManager.java:187) >> >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:188) >> >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:181) >> >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:121) >> >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91) >> >> at >> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60) >> >> at >> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) >> >> at >> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) >> >> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:19) >> >> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24) >> >> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:26) >> >> at $iwC$$iwC$$iwC$$iwC$$iwC.(:28) >> >> at $iwC$$iwC$$iwC$$iwC.(:30) >> >> at $iwC$$iwC$$iwC.(:32) >> >> at $iwC$$iwC.(:34) >> >> at $iwC.(:36) >> >> at (:38) >> >> at .(:42) >> >> at .() >> >> at .(:7) >> >> at .() >> >> at $print() >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:606) >> >> at >> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) >> >> at >> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) >> >> at >> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) >> >>
ClassNotFound for exception class in Spark 1.5.x
Hi, I try to throw an exception of my own exception class (MyException extends SparkException) on one of the executors. This works fine on Spark 1.3.x, 1.4.x but throws a deserialization/ClassNotFound exception on Spark 1.5.x. This happens only when I throw it on an executor, on the driver it succeeds. I'm using Spark in yarn-cluster mode. Is this a known issue? Is there any workaround for it? StackTrace: 15/11/18 15:00:17 WARN spark.ThrowableSerializationWrapper: Task exception could not be deserialized java.lang.ClassNotFoundException: org.example.MyException at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/11/18 15:00:17 ERROR scheduler.TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@7578da02 15/11/18 15:00:17 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, hadoop2.local.dmlab.hu): UnknownReason Regards, Zsolt
Re: ClassNotFound for exception class in Spark 1.5.x
Hi Tamás, the exception class is in the application jar, I'm using the spark-submit script. 2015-11-19 11:54 GMT+01:00 Tamas Szuromi <tamas.szur...@odigeo.com>: > Hi Zsolt, > > How you load the jar and how you prepend it to the classpath? > > Tamas > > > > > On 19 November 2015 at 11:02, Zsolt Tóth <toth.zsolt@gmail.com> wrote: > >> Hi, >> >> I try to throw an exception of my own exception class (MyException >> extends SparkException) on one of the executors. This works fine on Spark >> 1.3.x, 1.4.x but throws a deserialization/ClassNotFound exception on Spark >> 1.5.x. This happens only when I throw it on an executor, on the driver it >> succeeds. I'm using Spark in yarn-cluster mode. >> >> Is this a known issue? Is there any workaround for it? >> >> StackTrace: >> >> 15/11/18 15:00:17 WARN spark.ThrowableSerializationWrapper: Task exception >> could not be deserialized >> java.lang.ClassNotFoundException: org.example.MyException >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:270) >> at >> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) >> at >> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) >> at >> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108) >> at >> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) >> at >> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) >> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >> at >> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> 15/11/18 15:00:17 ERROR scheduler.TaskResultGetter: Could not deserialize >> TaskEndReason: ClassNotFound with classloader >> org.apache.spark.util.MutableURLClassLoader@7578da02 >> 15/11/18 15:00:17 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 >> (TID 30, hadoop2.local.dmlab.hu): UnknownReason >> >> Regards, >> Zsolt >> > >
Re: OutOfMemory error with Spark ML 1.5 logreg example
Hi, I ran your example on Spark-1.4.1 and 1.5.0-rc3. It succeeds on 1.4.1 but throws the OOM on 1.5.0. Do any of you know which PR introduced this issue? Zsolt 2015-09-07 16:33 GMT+02:00 Zoltán Zvara: > Hey, I'd try to debug, profile ResolvedDataSource. As far as I know, your > write will be performed by the JVM. > > On Mon, Sep 7, 2015 at 4:11 PM Tóth Zoltán wrote: > >> Unfortunately I'm getting the same error: >> The other interesting things are that: >> - the parquet files got actually written to HDFS (also with >> .write.parquet() ) >> - the application gets stuck in the RUNNING state for good even after >> the error is thrown >> >> 15/09/07 10:01:10 INFO spark.ContextCleaner: Cleaned accumulator 19 >> 15/09/07 10:01:10 INFO spark.ContextCleaner: Cleaned accumulator 5 >> 15/09/07 10:01:12 INFO spark.ContextCleaner: Cleaned accumulator 20 >> Exception in thread "Thread-7" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "Thread-7" >> Exception in thread "org.apache.hadoop.hdfs.PeerCache@4070d501" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread >> "org.apache.hadoop.hdfs.PeerCache@4070d501" >> Exception in thread "LeaseRenewer:r...@docker.rapidminer.com:8020" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread >> "LeaseRenewer:r...@docker.rapidminer.com:8020" >> Exception in thread "Reporter" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "Reporter" >> Exception in thread "qtp2134582502-46" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "qtp2134582502-46" >> >> >> >> >> On Mon, Sep 7, 2015 at 3:48 PM, boci wrote: >> >>> Hi, >>> >>> Can you try to using save method instead of write? >>> >>> ex: out_df.save("path","parquet") >>> >>> b0c1 >>> >>> >>> -- >>> Skype: boci13, Hangout: boci.b...@gmail.com >>> >>> On Mon, Sep 7, 2015 at 3:35 PM, Zoltán Tóth >>> wrote: >>> Aaand, the error! :) Exception in thread "org.apache.hadoop.hdfs.PeerCache@4e000abf" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "org.apache.hadoop.hdfs.PeerCache@4e000abf" Exception in thread "Thread-7" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Thread-7" Exception in thread "LeaseRenewer:r...@docker.rapidminer.com:8020" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "LeaseRenewer:r...@docker.rapidminer.com:8020" Exception in thread "Reporter" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Reporter" Exception in thread "qtp2115718813-47" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "qtp2115718813-47" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-scheduler-1" Log Type: stdout Log Upload Time: Mon Sep 07 09:03:01 -0400 2015 Log Length: 986 Traceback (most recent call last): File "spark-ml.py", line 33, in out_df.write.parquet("/tmp/logparquet") File "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/readwriter.py", line 422, in parquet File "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/utils.py", line 36, in deco File "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError On Mon, Sep 7, 2015 at 3:27 PM, Zoltán Tóth wrote: > Hi, > > When I execute the Spark ML Logisitc Regression example in pyspark I > run into an OutOfMemory exception. I'm wondering if any of you experienced > the same or has a hint about how to fix this. > > The interesting bit is that I only get the exception when I try to > write the result DataFrame into a file. If I only "print" any of the >
Spark-1.2.2-bin-hadoop2.4.tgz missing
Hi all, it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on the mirror sites. Am I missing something? Regards, Zsolt
Re: RDD collect hangs on large input data
Thanks for your answer Imran. I haven't tried your suggestions yet, but setting spark.shuffle.blockTransferService=nio solved my issue. There is a JIRA for this: https://issues.apache.org/jira/browse/SPARK-6962. Zsolt 2015-04-14 21:57 GMT+02:00 Imran Rashid iras...@cloudera.com: is it possible that when you switch to the bigger data set, your data is skewed, and so that some tasks generate far more data? reduceByKey could result in a huge amount of data going to a small number of tasks. I'd suggest (a) seeing what happens if you don't collect() -- eg. instead try writing to hdfs with saveAsObjectFile. (b) take a look at what is happening on the executors with the long running tasks. You can get thread dumps via the UI (or you can login into the boxes and use jstack). This might point to some of your code that is taking a long time, or it might point to spark internals. On Wed, Apr 8, 2015 at 3:45 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause the issue? Did you test it with Java 8?
Re: RDD collect hangs on large input data
I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause the issue? Did you test it with Java 8?
Re: Using ORC input for mllib algorithms
Thanks for your answer! Unfortunately I can't use Spark SQL for some reason. If anyone has experience in using ORC as hadoopFile, I'd be happy to read some hints/thoughts about my issues. Zsolt 2015-03-27 19:07 GMT+01:00 Xiangrui Meng men...@gmail.com: This is a PR in review to support ORC via the SQL data source API: https://github.com/apache/spark/pull/3753. You can try pulling that PR and help test it. -Xiangrui On Wed, Mar 25, 2015 at 5:03 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: Hi, I use sc.hadoopFile(directory, OrcInputFormat.class, NullWritable.class, OrcStruct.class) to use data in ORC format as an RDD. I made some benchmarking on ORC input vs Text input for MLlib and I ran into a few issues with ORC. Setup: yarn-cluster mode, 11 executors, 4 cores, 9g executor memory, 2g executor memoryOverhead, 1g driver memory. The cluster nodes have sufficient resources for the setup. Logistic regression: When using 1GB ORC input (stored in 4 blocks on hdfs), only one block (25%) is cached and only one executor is used, however the whole rdd could be cached even as Textfile (that's around 5.5GB). Is it possible to make Spark use the available resources? Decision tree: Using 8GB ORC input, the job fails every time with the Size exceeds INTEGER.MAX_VALUE error. Plus, I see errors from the JVM in the logs that container is running beyond physical memory limits. Is it possible to avoid this when using ORC input format? Tried to set the min.split.size/max.split.size or dfs.blocksize but that didn't help. Again, none of these happen when using Text input. Cheers, Zsolt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD collect hangs on large input data
) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:51 WARN server.TransportChannelHandler: Exception in connection from /10.99.144.92:49033 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 2015-03-29 17:01 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Don't call .collect if your data size huge, you can simply do a count() to trigger the execution. Can you paste your exception stack trace so that we'll know whats happening? Thanks Best Regards On Fri, Mar 27, 2015 at 9:18 PM, Zsolt Tóth toth.zsolt@gmail.com wrote: Hi, I have a simple Spark application: it creates an input rdd with sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The output rdd is small, a few MB's. Then I call collect() on the output. If the textfile is ~50GB, it finishes in a few minutes. However, if it's larger (~100GB) the execution hangs at the end of the collect() stage. The UI shows one active job (collect); one completed (flatMapToPair) and one active stage (collect). The collect stage has 880/892 tasks succeeded so I think the issue should happen when the whole job is finished (every task on the UI is either in SUCCESS or in RUNNING state). The driver and the containers don't log anything for 15 mins, then I get Connection time out. I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and Hadoop 2.4.0. This happens every time I run the process with larger input data so I think this isn't just a connection issue or something like that. Is this a Spark bug or something is wrong with my setup? Zsolt
RDD collect hangs on large input data
Hi, I have a simple Spark application: it creates an input rdd with sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The output rdd is small, a few MB's. Then I call collect() on the output. If the textfile is ~50GB, it finishes in a few minutes. However, if it's larger (~100GB) the execution hangs at the end of the collect() stage. The UI shows one active job (collect); one completed (flatMapToPair) and one active stage (collect). The collect stage has 880/892 tasks succeeded so I think the issue should happen when the whole job is finished (every task on the UI is either in SUCCESS or in RUNNING state). The driver and the containers don't log anything for 15 mins, then I get Connection time out. I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and Hadoop 2.4.0. This happens every time I run the process with larger input data so I think this isn't just a connection issue or something like that. Is this a Spark bug or something is wrong with my setup? Zsolt
Using ORC input for mllib algorithms
Hi, I use sc.hadoopFile(directory, OrcInputFormat.class, NullWritable.class, OrcStruct.class) to use data in ORC format as an RDD. I made some benchmarking on ORC input vs Text input for MLlib and I ran into a few issues with ORC. Setup: yarn-cluster mode, 11 executors, 4 cores, 9g executor memory, 2g executor memoryOverhead, 1g driver memory. The cluster nodes have sufficient resources for the setup. Logistic regression: When using 1GB ORC input (stored in 4 blocks on hdfs), only one block (25%) is cached and only one executor is used, however the whole rdd could be cached even as Textfile (that's around 5.5GB). Is it possible to make Spark use the available resources? Decision tree: Using 8GB ORC input, the job fails every time with the Size exceeds INTEGER.MAX_VALUE error. Plus, I see errors from the JVM in the logs that container is running beyond physical memory limits. Is it possible to avoid this when using ORC input format? Tried to set the min.split.size/max.split.size or dfs.blocksize but that didn't help. Again, none of these happen when using Text input. Cheers, Zsolt
Using 1.3.0 client jars with 1.2.1 assembly in yarn-cluster mode
Hi, I submit spark jobs in yarn-cluster mode remotely from java code by calling Client.submitApplication(). For some reason I want to use 1.3.0 jars on the client side (e.g spark-yarn_2.10-1.3.0.jar) but I have spark-assembly-1.2.1* on the cluster. The problem is that the ApplicationMaster can't find the user application jar (specified with --jar option). I think this is because of changes in the classpath population in the Client class. Is it possible to make this setup work without changing the codebase or the jars? Cheers, Zsolt
Re: Resource allocation in yarn-cluster mode
One more question: Is there reason why Spark throws an error when requesting too much memory instead of capping it to the maximum value (as YARN would do by default)? Thanks! 2015-02-10 17:32 GMT+01:00 Zsolt Tóth toth.zsolt@gmail.com: Hi, I'm using Spark in yarn-cluster mode and submit the jobs programmatically from the client in Java. I ran into a few issues when tried to set the resource allocation properties. 1. It looks like setting spark.executor.memory, spark.executor.cores and spark.executor.instances have no effect because ClientArguments checks only for the command line arguments (--num-executors, --executors cores, etc.). Is it possible to use the properties in yarn-cluster mode instead of the command line arguments? 2. My nodes have 5GB memory but when I set --executor-memory to 4g (overhead 384m), I get the exception that the required executor memory is above the max threshold of this cluster. It looks like this threshold is the value of the yarn.scheduler.maximum-allocation-mb property. Is that correct? Thanks, Zsolt
Resource allocation in yarn-cluster mode
Hi, I'm using Spark in yarn-cluster mode and submit the jobs programmatically from the client in Java. I ran into a few issues when tried to set the resource allocation properties. 1. It looks like setting spark.executor.memory, spark.executor.cores and spark.executor.instances have no effect because ClientArguments checks only for the command line arguments (--num-executors, --executors cores, etc.). Is it possible to use the properties in yarn-cluster mode instead of the command line arguments? 2. My nodes have 5GB memory but when I set --executor-memory to 4g (overhead 384m), I get the exception that the required executor memory is above the max threshold of this cluster. It looks like this threshold is the value of the yarn.scheduler.maximum-allocation-mb property. Is that correct? Thanks, Zsolt
[mllib] Decision Tree - prediction probabilites of label classes
Hi, I use DecisionTree for multi class classification. I can get the probability of the predicted label for every node in the decision tree from node.predict().prob(). Is it possible to retrieve or count the probability of every possible label class in the node? To be more clear: Say in Node A there are 4 of label 0.0, 2 of label 1.0 and 3 of label 2.0. If I'm correct predict.prob() is 4/9 in this case. I need the values 2/9 and 3/9 for the 2 other labels. It would be great to retrieve the exact count of label classes ([4,2,3] in the example) but I don't think thats possible now. Is something like this planned for a future release? Thanks!