Re: LIVY VS Spark Job Server

2016-09-15 Thread Vadim Semenov
I have experience with both Livy & spark-jobserver. spark-jobserver gives you better API, particularly, if you want to work within a single spark context. Livy supports submitting python & R code while spark-jobserver doesn't support it. spark-jobserver code is more complex, it actively uses

Re: using SparkILoop.run

2016-09-26 Thread Vadim Semenov
Add "-Dspark.master=local[*]" to the VM properties of your test run. On Mon, Sep 26, 2016 at 2:25 PM, Mohit Jaggi wrote: > I want to use the following API SparkILoop.run(...). I am writing a test > case as that passes some scala code to spark interpreter and receives >

Dynamically change executors settings

2016-08-26 Thread Vadim Semenov
Hi spark users, I wonder if it's possible to change executors settings on-the-fly. I have the following use-case: I have a lot of non-splittable skewed files in a custom format that I read using a custom Hadoop RecordReader. These files can be small & huge and I'd like to use only one-two cores

Re: Restful WS for Spark

2016-09-30 Thread Vadim Semenov
There're two REST job servers that work with spark: https://github.com/spark-jobserver/spark-jobserver https://github.com/cloudera/livy On Fri, Sep 30, 2016 at 2:07 PM, ABHISHEK wrote: > Hello all, > Have you tried accessing Spark application using Restful web-services? >

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-09-30 Thread Vadim Semenov
Can you post the whole exception stack trace? What are your executor memory settings? Right now I assume that it happens in UnsafeExternalRowSorter -> UnsafeExternalSorter:insertRecord Running more executors with lower `spark.executor.memory` should help. On Fri, Sep 30, 2016 at 12:57 PM,

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-09-30 Thread Vadim Semenov
ad.run(Thread.java:745) > > I'm running spark in local mode so there is only one executor, the driver > and spark.driver.memory is set to 64g. Changing the driver's memory doesn't > help. > > *Babak Alipour ,* > *University of Florida* > > On Fri, Sep 30, 2016 at 2:05 P

Re: get different results when debugging and running scala program

2016-10-01 Thread Vadim Semenov
The question has no connection to spark. In future, if you use apache mailing lists, use external services to add screenshots and make sure that your code is formatted so other members'd be able to read it. On Fri, Sep 30, 2016 at 11:25 AM, chen yong wrote: > Hello All, > >

Re: Spark on yarn enviroment var

2016-10-01 Thread Vadim Semenov
The question should be addressed to the oozie community. As far as I remember, a spark action doesn't have support of env variables. On Fri, Sep 30, 2016 at 8:11 PM, Saurabh Malviya (samalviy) < samal...@cisco.com> wrote: > Hi, > > > > I am running spark on yarn using oozie. > > > > When submit

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-10-01 Thread Vadim Semenov
oh, and try to run even smaller executors, i.e. with `spark.executor.memory` <= 16GiB. I wonder what result you're going to get. On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > > Do you mean running a multi-JVM 'cluster' on the single machine

Re: Restful WS for Spark

2016-10-01 Thread Vadim Semenov
, will > job run in Hadoop cluster ? > How stable is this API as we will need to implement it in production env. > Livy looks more promising but still need not matured. > Have you tested any of them ? > > Thanks, > Abhishek > Abhishek > > > On Fri, Sep 30, 2016 at 11:39

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-10-01 Thread Vadim Semenov
> long[]​. > Is it possible to force this specific operation to go off-heap so that it > can possibly use a bigger page size? > > > > ​>Babak​ > > > *Babak Alipour ,* > *University of Florida* > > On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov < > vadim.seme...@d

Re: Live data visualisations with Spark

2016-11-08 Thread Vadim Semenov
Take a look at https://zeppelin.apache.org On Tue, Nov 8, 2016 at 11:13 AM, Andrew Holway < andrew.hol...@otternetworks.de> wrote: > Hello, > > A colleague and I are trying to work out the best way to provide live data > visualisations based on Spark. Is it possible to explore a dataset in spark

Re: java.lang.OutOfMemoryError: unable to create new native thread

2016-10-31 Thread Vadim Semenov
Have you tried to get number of threads in a running process using `cat /proc//status` ? On Sun, Oct 30, 2016 at 11:04 PM, kant kodali wrote: > yes I did run ps -ef | grep "app_name" and it is root. > > > > On Sun, Oct 30, 2016 at 8:00 PM, Chan Chor Pang

Re: How to avoid unnecessary spark starkups on every request?

2016-11-02 Thread Vadim Semenov
Take a look at https://github.com/spark-jobserver/spark-jobserver or https://github.com/cloudera/livy you can launch a persistent spark context and then submit your jobs using a already running context On Wed, Nov 2, 2016 at 3:34 AM, Fanjin Zeng wrote: > Hi, > > I

Re: Any Dynamic Compilation of Scala Query

2016-10-26 Thread Vadim Semenov
You can use Cloudera Livy for that https://github.com/cloudera/livy take a look at this example https://github.com/cloudera/livy#spark-example On Wed, Oct 26, 2016 at 4:35 AM, Mahender Sarangam < mahender.bigd...@outlook.com> wrote: > Hi, > > Is there any way to dynamically execute a string

Re: Livy with Spark

2016-12-05 Thread Vadim Semenov
You mean share a single spark context across multiple jobs? https://github.com/spark-jobserver/spark-jobserver does the same On Mon, Dec 5, 2016 at 9:33 AM, Mich Talebzadeh wrote: > Hi, > > Has there been any experience using Livy with Spark to share multiple > Spark

Re: Spark kryo serialization register Datatype[]

2016-12-21 Thread Vadim Semenov
to enable kryo serializer you just need to pass `spark.serializer=org.apache.spark.serializer.KryoSerializer` the `spark.kryo.registrationRequired` controls the following behavior: Whether to require registration with Kryo. If set to 'true', Kryo will > throw an exception if an unregistered

Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Vadim Semenov
Check the source code for SparkLauncher: https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java#L541 a separate process will be started using `spark-submit` and if it uses `yarn-cluster` mode, a driver may be launched on another NodeManager

Re: NoClassDefFoundError

2016-12-21 Thread Vadim Semenov
You better ask folks in the spark-jobserver gitter channel: https://github.com/spark-jobserver/spark-jobserver On Wed, Dec 21, 2016 at 8:02 AM, Reza zade wrote: > Hello > > I've extended the JavaSparkJob (job-server-0.6.2) and created an object > of SQLContext class. my

Re: Looking at EMR Logs

2017-03-31 Thread Vadim Semenov
You can provide your own log directory, where Spark log will be saved, and that you could replay afterwards. Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and run it. Note! The path `s3://bucket/some/directory` must exist before you run your job, it'll not be created

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
Also check the `RDD.checkpoint()` method https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1550 On Wed, Aug 2, 2017 at 8:46 PM, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > I'm not sure that "checkpointed" m

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it just saves data to some destination. `cache/persist` allow you to cache data and keep the DAG in case of some executor that holds data goes down, so Spark would still be able to recalculate missing partitions

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
ed RDD is persisted in > memory, otherwise saving it on a file will require recomputation."* > > > To me that means checkpoint will not prevent the recomputation that i was > hoping for > -- > *From:* Vadim Semenov <vadim.seme...@datadogh

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
to delete it somehow else. BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to local disk, making more space in JVM and letting you to avoid hdfs. On Wednesday, August 2, 2017, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > `saveAsObjectFile` doesn't save the DAG, i

Re: [Spark Core] Is it possible to insert a function directly into the Logical Plan?

2017-08-14 Thread Vadim Semenov
Something like this, maybe? import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.catalyst.encoders.RowEncoder val df: DataFrame = ??? val spark = df.sparkSession val

Re: underlying checkpoint

2017-07-13 Thread Vadim Semenov
You need to trigger an action on that rdd to checkpoint it. ``` scala>spark.sparkContext.setCheckpointDir(".") scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20))) df: org.apache.spark.sql.DataFrame = [_1: string, _2: int] scala>

Re: How to insert a dataframe as a static partition to a partitioned table

2017-07-20 Thread Vadim Semenov
This should work: ``` ALTER TABLE `table` ADD PARTITION (partcol=1) LOCATION '/path/to/your/dataset' ``` On Wed, Jul 19, 2017 at 6:13 PM, ctang wrote: > I wonder if there are any easy ways (or APIs) to insert a dataframe (or > DataSet), which does not contain the partition

Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-05 Thread Vadim Semenov
Are you sure that you use S3A? Because EMR says that they do not support S3A https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/ > Amazon EMR does not currently support use of the Apache Hadoop S3A file system. I think that the HEAD requests come from the

Re: How can i remove the need for calling cache

2017-08-01 Thread Vadim Semenov
You can use `.checkpoint()`: ``` val sc: SparkContext sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") myrdd.checkpoint() val result1 = myrdd.map(op1(_)) result1.count() // Will save `myrdd` to HDFS and do map(op1… val result2 = myrdd.map(op2(_)) result2.count() // Will load `myrdd` from

Re: count exceed int.MaxValue

2017-08-08 Thread Vadim Semenov
Scala doesn't support ranges >= Int.MaxValue https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/immutable/Range.scala?utf8=✓#L89 You can create two RDDs and unionize them: scala> val rdd = sc.parallelize(1L to Int.MaxValue.toLong).union(sc.parallelize(1L to

Re: Spark <--> S3 flakiness

2017-05-11 Thread Vadim Semenov
Use the official mailing list archive http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3ccajyeq0gh1fbhbajb9gghognhqouogydba28lnn262hfzzgf...@mail.gmail.com%3e On Thu, May 11, 2017 at 2:50 PM, lucas.g...@gmail.com wrote: > Also, and this is unrelated to the

Re: [How-To] Custom file format as source

2017-06-12 Thread Vadim Semenov
It should be easy to start with a custom Hadoop InputFormat that reads the file and creates a `RDD[Row]`, since you know the records size, it should be pretty easy to make the InputFormat to produce splits, so then you could read the file in parallel. On Mon, Jun 12, 2017 at 6:01 AM, OBones

Re: How does HashPartitioner distribute data in Spark?

2017-06-23 Thread Vadim Semenov
This is the code that chooses the partition for a key: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88 it's basically `math.abs(key.hashCode % numberOfPartitions)` On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <

Re: "Sharing" dataframes...

2017-06-20 Thread Vadim Semenov
You can launch one permanent spark context and then execute your jobs within the context. And since they'll be running in the same context, they can share data easily. These two projects provide the functionality that you need:

Re: Bizarre UI Behavior after migration

2017-05-22 Thread Vadim Semenov
I believe it shows only the tasks that have actually being executed, if there were tasks with no data, they don't get reported. I might be mistaken, if somebody has a good explanation, would also like to hear. On Fri, May 19, 2017 at 5:45 PM, Miles Crawford wrote: > Hey

Re: [Spark Core] Does spark support read from remote Hive server via JDBC

2017-06-08 Thread Vadim Semenov
Have you tried running a query? something like: ``` test.select("*").limit(10).show() ``` On Thu, Jun 8, 2017 at 4:16 AM, Даша Ковальчук wrote: > Hi guys, > > I need to execute hive queries on remote hive server from spark, but for > some reasons i receive only

Re: Configuration for unit testing and sql.shuffle.partitions

2017-09-18 Thread Vadim Semenov
you can create a Super class "FunSuiteWithSparkContext" that's going to create a Spark sessions, Spark context, and SQLContext with all the desired properties. Then you add the class to all the relevant test suites, and that's pretty much it. The other option can be is to pass it as a VM

Re: SVD computation limit

2017-09-19 Thread Vadim Semenov
This may also be related to https://issues.apache.org/jira/browse/SPARK-22033 On Tue, Sep 19, 2017 at 3:40 PM, Mark Bittmann wrote: > I've run into this before. The EigenValueDecomposition creates a Java > Array with 2*k*n elements. The Java Array is indexed with a native

Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-22 Thread Vadim Semenov
1. 40s is pretty negligible unless you run your job very frequently, there can be many factors that influence that. 2. Try to compare the CPU time instead of the wall-clock time 3. Check the stages that got slower and compare the DAGs 4. Test with dynamic allocation disabled On Fri, Sep 22,

Re: how do you deal with datetime in Spark?

2017-10-03 Thread Vadim Semenov
I usually check the list of Hive UDFs as Spark has implemented almost all of them https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions Or/and check `org.apache.spark.sql.functions` directly:

Re: Unable to run Spark Jobs in yarn cluster mode

2017-10-10 Thread Vadim Semenov
Try increasing the `spark.yarn.am.waitTime` parameter, it's by default set to 100ms which might not be enough in certain cases. On Tue, Oct 10, 2017 at 7:02 AM, Debabrata Ghosh wrote: > Hi All, > I am constantly hitting an error : "ApplicationMaster: >

Re: EMR: Use extra mounted EBS volumes for spark.local.dir

2017-10-10 Thread Vadim Semenov
that's probably better be directed to the AWS support On Sun, Oct 8, 2017 at 9:54 PM, Tushar Sudake wrote: > Hello everyone, > > I'm using 'r4.8xlarge' instances on EMR for my Spark Application. > To each node, I'm attaching one 512 GB EBS volume. > > By logging in into

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Vadim Semenov
When you do `Dataset.rdd` you actually create a new job here you can see what it does internally: https://github.com/apache/spark/blob/master/sql/core/ src/main/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828 On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala

Re: Bizarre UI Behavior after migration

2017-09-10 Thread Vadim Semenov
rozen, active state. > > > On Mon, May 22, 2017 at 12:50 PM, Vadim Semenov < > vadim.seme...@datadoghq.com> wrote: > >> I believe it shows only the tasks that have actually being executed, if >> there were tasks with no data, they don't get reported. >>

Re: More instances = slower Spark job

2017-09-28 Thread Vadim Semenov
Instead of having one job, you can try processing each file in a separate job, but run multiple jobs in parallel within one SparkContext. Something like this should work for you, it'll submit N jobs from the driver, the jobs will run independently, but executors will dynamically work on different

Re: Massive fetch fails, io errors in TransportRequestHandler

2017-09-28 Thread Vadim Semenov
Looks like there's slowness in sending shuffle files, maybe one executor get overwhelmed with all the other executors trying to pull data? Try lifting `spark.network.timeout` further, we ourselves had to push it to 600s from the default 120s On Thu, Sep 28, 2017 at 10:19 AM, Ilya Karpov

Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
as an alternative ``` spark-submit --files ``` the files will be put on each executor in the working directory, so you can then load it alongside your `map` function Behind the scene it uses `SparkContext.addFile` method that you can use too

Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
allelize(…).map(test => { Model.get().use(…) }) } } ``` On Thu, Sep 28, 2017 at 3:49 PM, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > as an alternative > ``` > spark-submit --files > ``` > > the files will be put on each executor in the working directory, so you &

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-09-29 Thread Vadim Semenov
As alternative: checkpoint the dataframe, collect days, and then delete corresponding directories using hadoop FileUtils, then write the dataframe On Fri, Sep 29, 2017 at 10:31 AM, peay wrote: > Hello, > > I am trying to use

Re: HDFS or NFS as a cache?

2017-09-29 Thread Vadim Semenov
How many files you produce? I believe it spends a lot of time on renaming the files because of the output committer. Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they have 10GbE and you can get good throughput for S3. On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <

Re: More instances = slower Spark job

2017-09-29 Thread Vadim Semenov
ya...@gmail.com> > wrote: > > On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov > > <vadim.seme...@datadoghq.com> wrote: > >> Instead of having one job, you can try processing each file in a > separate > >> job, but run multiple jobs in parallel within one Spark

Re: Multiple filters vs multiple conditions

2017-10-03 Thread Vadim Semenov
Since you're using Dataset API or RDD API, they won't be fused together by the Catalyst optimizer unless you use the DF API. Two filters will get executed within one stage, and there'll be very small overhead on having two separate filters vs having only one. On Tue, Oct 3, 2017 at 8:14 AM, Ahmed

Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Vadim Semenov
I didn't tailor it to your needs, but this is what I can offer you, the idea should be pretty clear import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{collect_list, struct} val spark: SparkSession import spark.implicits._ case class Input( a: Int, b: Long, c:

Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-28 Thread Vadim Semenov
When you create a EMR cluster you can specify a S3 path where logs will be saved after cluster, something like this: s3://bucket/j-18ASDKLJLAKSD/containers/application_1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz

Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Vadim Semenov
n MDC way with spark or something other than to achieve this? > > > > Alex > > > > *From: *Vadim Semenov <vadim.seme...@datadoghq.com> > *Date: *Monday, August 28, 2017 at 5:18 PM > *To: *"Mikhailau, Alex" <alex.mikhai...@mlb.com> > *Cc: *"us

Re: Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-22 Thread Vadim Semenov
The error message seems self-explanatory, try to figure out what's the disk quota you have for your user. On Wed, Nov 22, 2017 at 8:23 AM, Chetan Khatri wrote: > Anybody reply on this ? > > On Tue, Nov 21, 2017 at 3:36 PM, Chetan Khatri < >

Re: JDK1.8 for spark workers

2017-11-29 Thread Vadim Semenov
You can pass `JAVA_HOME` environment variable `spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0` On Wed, Nov 29, 2017 at 10:54 AM, KhajaAsmath Mohammed < mdkhajaasm...@gmail.com> wrote: > Hi, > > I am running cloudera version of spark2.1 and our cluster is on JDK1.7. > For some of the

Re: NullPointerException while reading a column from the row

2017-12-19 Thread Vadim Semenov
getAs defined as: def getAs[T](i: Int): T = get(i).asInstanceOf[T] and when you do toString you call Object.toString which doesn't depend on the type, so asInstanceOf[T] get dropped by the compiler, i.e. row.getAs[Int](0).toString -> row.get(0).toString we can confirm that by writing a simple

Re: What does Blockchain technology mean for Big Data? And how Hadoop/Spark will play role with it?

2017-12-19 Thread Vadim Semenov
I think it means that we can replace HDFS with a blockchain-based FS, and then offload some processing to smart contracts. On Mon, Dec 18, 2017 at 11:59 PM, KhajaAsmath Mohammed < mdkhajaasm...@gmail.com> wrote: > I am looking for same answer too .. will wait for response from other > people > >

Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
Spark doesn't remove intermediate shuffle files if they're part of the same job. On Mon, Dec 18, 2017 at 3:10 PM, Mihai Iacob wrote: > This code generates files under /tmp...blockmgr... which do not get > cleaned up after the job finishes. > > Anything wrong with the code

Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
:08 AM, Mihai Iacob <mia...@ca.ibm.com> wrote: > When does spark remove them? > > > Regards, > > *Mihai Iacob* > DSX Local <https://datascience.ibm.com/local> - Security, IBM Analytics > > > > - Original message - > From: Vadim Semenov <

Re: Kryo not registered class

2017-11-20 Thread Vadim Semenov
Try: Class.forName("[Lorg.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation;") On Sun, Nov 19, 2017 at 3:24 PM, Angel Francisco Orta < angel.francisco.o...@gmail.com> wrote: > Hello, I'm with spark 2.1.0 with scala and I'm

Re: Process large JSON file without causing OOM

2017-11-15 Thread Vadim Semenov
There's a lot of off-heap memory involved in decompressing Snappy, compressing ZLib. Since you're running using `local[*]`, you process multiple tasks simultaneously, so they all might consume memory. I don't think that increasing heap will help, since it looks like you're hitting system memory

Re: Spark based Data Warehouse

2017-11-12 Thread Vadim Semenov
It's actually quite simple to answer > 1. Is Spark SQL and UDF, able to handle all the workloads? Yes > 2. What user interface did you provide for data scientist, data engineers and analysts Home-grown platform, EMR, Zeppelin > What are the challenges in running concurrent queries, by many

Re: RDD[internalRow] -> DataSet

2017-12-12 Thread Vadim Semenov
not possible, but you can add your own object in your project to the spark's package that would give you access to private methods package org.apache.spark.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.LogicalRDD import

Re: Free Column Reference with $

2018-05-04 Thread Vadim Semenov
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L38-L47 It's called String Interpolation See "Advanced Usage" here https://docs.scala-lang.org/overviews/core/string-interpolation.html On Fri, May 4, 2018 at 10:10 AM, Christopher Piggott

Re: Tuning Resource Allocation during runtime

2018-04-27 Thread Vadim Semenov
You can not change dynamically the number of cores per executor or cores per task, but you can change the number of executors. In one of my jobs I have something like this, so when I know that I don't need more than 4 executors, I kill all other executors (assuming that they don't hold any cached

Re:

2018-05-16 Thread Vadim Semenov
Upon downsizing to 20 partitions some of your partitions become too big, and I see that you're doing caching, and executors try to write big partitions to disk, but fail because they exceed 2GiB > Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at

Re: Time series data

2018-05-24 Thread Vadim Semenov
Yeah, it depends on what you want to do with that timeseries data. We at Datadog process trillions of points daily using Spark, I cannot really go about what exactly we do with the data, but just saying that Spark can handle the volume, scale well and be fault-tolerant, albeit everything I said

Writing rows directly in Tungsten format into memory

2018-06-12 Thread Vadim Semenov
Is there a way to write rows directly into off-heap memory in the Tungsten format bypassing creating objects? I have a lot of rows, and right now I'm creating objects, and they get encoded, but because of the number of rows, it creates significant pressure on GC. I'd like to avoid creating

Re: Can we get the partition Index in an UDF

2018-06-25 Thread Vadim Semenov
Try using `TaskContext`: import org.apache.spark.TaskContext val partitionId = TaskContext.getPartitionId() On Mon, Jun 25, 2018 at 11:17 AM Lalwani, Jayesh wrote: > > We are trying to add a column to a Dataframe with some data that is seeded by > some random data. We want to be able to

Re: spark.executor.extraJavaOptions inside application code

2018-05-02 Thread Vadim Semenov
You need to pass config before creating a session val conf = new SparkConf() // All three methods below are equivalent conf.set("spark.executor.extraJavaOptions", "-Dbasicauth=myuser:mypassword") conf.set("spark.executorEnv.basicauth", "myuser:mypassword") conf.setExecutorEnv("basicauth",

Re: [Spark 2.x Core] .collect() size limit

2018-04-30 Thread Vadim Semenov
`.collect` returns an Array, and array's can't have more than Int.MaxValue elements, and in most JVMs it's lower: `Int.MaxValue - 8` So it puts upper limit, however, you can just create Array of Arrays, and so on, basically limitless, albeit with some gymnastics.

Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Vadim Semenov
As typical `JAVA_OPTS` you need to pass as a single parameter: --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB" Also you got an extra space in the parameter, there should be no space after the colon symbol On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu wrote: > > Hi, > > I used

Re: Inferring Data driven Spark parameters

2018-07-03 Thread Vadim Semenov
You can't change the executor/driver cores/memory on the fly once you've already started a Spark Context. On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu wrote: > > We aren't using Oozie or similar, moreover, the end to end job shall be > exactly the same, but the data will be extremely different

Re: Sharing spark executor pool across multiple long running spark applications

2018-02-07 Thread Vadim Semenov
The other way might be to launch a single SparkContext and then run jobs inside of it. You can take a look at these projects: - https://github.com/spark-jobserver/spark-jobserver#persistent-context-mode---faster--required-for-related-jobs - http://livy.incubator.apache.org Problems with this

Re: /tmp fills up to 100GB when using a window function

2017-12-20 Thread Vadim Semenov
Ah, yes, I missed that part it's `spark.local.dir` spark.local.dir /tmp Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories

Re: Passing an array of more than 22 elements in a UDF

2017-12-26 Thread Vadim Semenov
Functions are still limited to 22 arguments https://github.com/scala/scala/blob/2.13.x/src/library/scala/Function22.scala On Tue, Dec 26, 2017 at 2:19 PM, Felix Cheung wrote: > Generally the 22 limitation is from Scala 2.10. > > In Scala 2.11, the issue with case

Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
All used cores aren't getting reported correctly in EMR, and YARN itself has no control over it, so whatever you put in `spark.executor.cores` will be used, but in the ResourceManager you will only see 1 vcore used per nodemanager. On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman

Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
s per node 8. Am I missing to relate here. > > What I m thinking now is number of vote = number of threads. > > > > On Mon, 26 Feb 2018 at 18:45, Vadim Semenov <va...@datadoghq.com> wrote: > >> All used cores aren't getting reported correctly in EMR, and YARN it

Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Vadim Semenov
object MyDatabseSingleton { @transient lazy val dbConn = DB.connect(…) `transient` marks the variable to be excluded from serialization and `lazy` would open connection only when it's needed and also makes sure that the val is thread-safe

Re: Broadcast variable size limit?

2018-08-05 Thread Vadim Semenov
That’s the max size of a byte array in Java, limited by the length which is defined as integer, and in most JVMS arrays can’t hold more than Int.MaxValue - 8 elements. Other way to overcome this is to create multiple broadcast variables On Sunday, August 5, 2018, klrmowse wrote: > i don't need

Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-27 Thread Vadim Semenov
`spark.worker.cleanup.enabled=true` doesn't work for YARN. On Fri, Jul 27, 2018 at 8:52 AM dineshdharme wrote: > > I am trying to do few (union + reduceByKey) operations on a hiearchical > dataset in a iterative fashion in rdd. The first few loops run fine but on > the subsequent loops, the

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Vadim Semenov
`coalesce` sets the number of partitions for the last stage, so you have to use `repartition` instead which is going to introduce an extra shuffle stage On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers wrote: > > one small correction: lots of files leads to pressure on the spark driver > program

Re: java.lang.IndexOutOfBoundsException: len is negative - when data size increases

2018-08-16 Thread Vadim Semenov
one of the spills becomes bigger than 2GiB and can't be loaded fully (as arrays in Java can't have more than 2^32 values) > > org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76) You can try increasing the number of partitions, so

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2018-09-07 Thread Vadim Semenov
You have too many partitions, so when the driver is trying to gather the status of all map outputs and send back to executors it chokes on the size of the structure that needs to be GZipped, and since it's bigger than 2GiB, it produces OOM. On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman wrote: >

Re: Dataframe joins - UnsupportedOperationException: Unimplemented type: IntegerType

2018-07-09 Thread Vadim Semenov
That usually happens when you have different types for a column in some parquet files. In this case, I think you have a column of `Long` type that got a file with `Integer` type, I had to deal with similar problem once. You would have to cast it yourself to Long. On Mon, Jul 9, 2018 at 2:53 PM

Re: Dynamic allocation not releasing executors after unpersisting all cached data

2018-07-09 Thread Vadim Semenov
Try doing `unpersist(blocking=true)` On Mon, Jul 9, 2018 at 2:59 PM Jeffrey Charles wrote: > > I'm persisting a dataframe in Zeppelin which has dynamic allocation enabled > to get a sense of how much memory the dataframe takes up. After I note the > size, I unpersist the dataframe. For some

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Who's your spark provider? EMR, Azure, Databricks, etc.? Maybe contact them, since they've probably applied some patches Also have you checked `stdout` for some Segfaults? I vaguely remember getting `Task failed while writing rows at` and seeing some segfaults that caused that On Wed, Feb 28,

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Yeah, without actually seeing what's happening on that line, it'd be difficult to say for sure. You can check what patches HortonWorks applied, or/and ask them. And yeah, seg fault is totally possible on any size of the data. But you should've seen it in the `stdout` (assuming that the regular

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
There should be another exception trace (basically, the actual cause) after this one, could you post it? On Wed, Feb 28, 2018 at 1:39 PM, unk1102 wrote: > Hi I am getting the following exception when I try to write DataFrame using > the following code. Please guide. I am

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
I'm sorry, didn't see `Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)` Are you sure that you use 2.2.0? I don't see any code on that line

Re: Spark & S3 - Introducing random values into key names

2018-03-08 Thread Vadim Semenov
You need to put randomness into the beginning of the key, if you put it other than into the beginning, it's not guaranteed that you're going to have good performance. The way we achieved this is by writing to HDFS first, and then having a custom DistCp implemented using Spark that copies parquet

Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
But overall, I think the original approach is not correct. If you get a single file in 10s GB, the approach is probably must be reworked. I don't see why you can't just write multiple CSV files using Spark, and then concatenate them without Spark On Fri, Mar 9, 2018 at 10:02 AM, Vadim Semenov

Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
ocation. > > Thanks > Deepak > > On Fri, Mar 9, 2018, 20:12 Vadim Semenov <va...@datadoghq.com> wrote: > >> because `coalesce` gets propagated further up in the DAG in the last >> stage, so your last stage only has one task. >> >> You need to break

Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
because `coalesce` gets propagated further up in the DAG in the last stage, so your last stage only has one task. You need to break your DAG so your expensive operations would be in a previous stage before the stage with `.coalesce(1)` On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <

Re: OutOfDirectMemoryError for Spark 2.2

2018-03-06 Thread Vadim Semenov
Do you have a trace? i.e. what's the source of `io.netty.*` calls? And have you tried bumping `-XX:MaxDirectMemorySize`? On Tue, Mar 6, 2018 at 12:45 AM, Chawla,Sumit wrote: > Hi All > > I have a job which processes a large dataset. All items in the dataset > are

Re: Spark Job Server application compilation issue

2018-03-14 Thread Vadim Semenov
This question should be directed to the `spark-jobserver` group: https://github.com/spark-jobserver/spark-jobserver#contact They also have a gitter chat. Also include the errors you get once you're going to be asking them a question On Wed, Mar 14, 2018 at 1:37 PM, sujeet jog

Re: Can I get my custom spark strategy to run last?

2018-03-02 Thread Vadim Semenov
Something like this? sparkSession.experimental.extraStrategies = Seq(Strategy) val logicalPlan = df.logicalPlan val newPlan: LogicalPlan = Strategy(logicalPlan) Dataset.ofRows(sparkSession, newPlan) On Thu, Mar 1, 2018 at 8:20 PM, Keith Chapman wrote: > Hi, > > I'd

Re: Does Spark have a plan to move away from sun.misc.Unsafe?

2018-10-25 Thread Vadim Semenov
Here you go: the umbrella ticket: https://issues.apache.org/jira/browse/SPARK-24417 and the sun.misc.unsafe one https://issues.apache.org/jira/browse/SPARK-24421 On Wed, Oct 24, 2018 at 8:08 PM kant kodali wrote: > > Hi All, > > Does Spark have a plan to move away from sun.misc.Unsafe to

  1   2   >