Re: Re: EXT: Dual Write to HDFS and MinIO in faster way
I am looking for writer/comitter optimization which can make the spark write faster. On Tue, May 21, 2024 at 9:15 PM eab...@163.com wrote: > Hi, > I think you should write to HDFS then copy file (parquet or orc) from > HDFS to MinIO. > > -- > eabour > > > *From:* Prem Sahoo > *Date:* 2024-05-22 00:38 > *To:* Vibhor Gupta ; user > > *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way > > > On Tue, May 21, 2024 at 6:58 AM Prem Sahoo wrote: > >> Hello Vibhor, >> Thanks for the suggestion . >> I am looking for some other alternatives where I can use the same >> dataframe can be written to two destinations without re execution and cache >> or persist . >> >> Can some one help me in scenario 2 ? >> How to make spark write to MinIO faster ? >> Sent from my iPhone >> >> On May 21, 2024, at 1:18 AM, Vibhor Gupta >> wrote: >> >> >> >> Hi Prem, >> >> >> >> You can try to write to HDFS then read from HDFS and write to MinIO. >> >> >> >> This will prevent duplicate transformation. >> >> >> >> You can also try persisting the dataframe using the DISK_ONLY level. >> >> >> >> Regards, >> >> Vibhor >> >> *From: *Prem Sahoo >> *Date: *Tuesday, 21 May 2024 at 8:16 AM >> *To: *Spark dev list >> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way >> >> *EXTERNAL: *Report suspicious emails to *Email Abuse.* >> >> Hello Team, >> >> I am planning to write to two datasource at the same time . >> >> >> >> Scenario:- >> >> >> >> Writing the same dataframe to HDFS and MinIO without re-executing the >> transformations and no cache(). Then how can we make it faster ? >> >> >> >> Read the parquet file and do a few transformations and write to HDFS and >> MinIO. >> >> >> >> here in both write spark needs execute the transformation again. Do we >> know how we can avoid re-execution of transformation without >> cache()/persist ? >> >> >> >> Scenario2 :- >> >> I am writing 3.2G data to HDFS and MinIO which takes ~6mins. >> >> Do we have any way to make writing this faster ? >> >> >> >> I don't want to do repartition and write as repartition will have >> overhead of shuffling . >> >> >> >> Please provide some inputs. >> >> >> >> >> >>
Re: Re: EXT: Dual Write to HDFS and MinIO in faster way
Hi, I think you should write to HDFS then copy file (parquet or orc) from HDFS to MinIO. eabour From: Prem Sahoo Date: 2024-05-22 00:38 To: Vibhor Gupta; user Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way On Tue, May 21, 2024 at 6:58 AM Prem Sahoo wrote: Hello Vibhor, Thanks for the suggestion . I am looking for some other alternatives where I can use the same dataframe can be written to two destinations without re execution and cache or persist . Can some one help me in scenario 2 ? How to make spark write to MinIO faster ? Sent from my iPhone On May 21, 2024, at 1:18 AM, Vibhor Gupta wrote: Hi Prem, You can try to write to HDFS then read from HDFS and write to MinIO. This will prevent duplicate transformation. You can also try persisting the dataframe using the DISK_ONLY level. Regards, Vibhor From: Prem Sahoo Date: Tuesday, 21 May 2024 at 8:16 AM To: Spark dev list Subject: EXT: Dual Write to HDFS and MinIO in faster way EXTERNAL: Report suspicious emails to Email Abuse. Hello Team, I am planning to write to two datasource at the same time . Scenario:- Writing the same dataframe to HDFS and MinIO without re-executing the transformations and no cache(). Then how can we make it faster ? Read the parquet file and do a few transformations and write to HDFS and MinIO. here in both write spark needs execute the transformation again. Do we know how we can avoid re-execution of transformation without cache()/persist ? Scenario2 :- I am writing 3.2G data to HDFS and MinIO which takes ~6mins. Do we have any way to make writing this faster ? I don't want to do repartition and write as repartition will have overhead of shuffling . Please provide some inputs.
Re: EXT: Dual Write to HDFS and MinIO in faster way
On Tue, May 21, 2024 at 6:58 AM Prem Sahoo wrote: > Hello Vibhor, > Thanks for the suggestion . > I am looking for some other alternatives where I can use the same > dataframe can be written to two destinations without re execution and cache > or persist . > > Can some one help me in scenario 2 ? > How to make spark write to MinIO faster ? > Sent from my iPhone > > On May 21, 2024, at 1:18 AM, Vibhor Gupta > wrote: > > > > Hi Prem, > > > > You can try to write to HDFS then read from HDFS and write to MinIO. > > > > This will prevent duplicate transformation. > > > > You can also try persisting the dataframe using the DISK_ONLY level. > > > > Regards, > > Vibhor > > *From: *Prem Sahoo > *Date: *Tuesday, 21 May 2024 at 8:16 AM > *To: *Spark dev list > *Subject: *EXT: Dual Write to HDFS and MinIO in faster way > > *EXTERNAL: *Report suspicious emails to *Email Abuse.* > > Hello Team, > > I am planning to write to two datasource at the same time . > > > > Scenario:- > > > > Writing the same dataframe to HDFS and MinIO without re-executing the > transformations and no cache(). Then how can we make it faster ? > > > > Read the parquet file and do a few transformations and write to HDFS and > MinIO. > > > > here in both write spark needs execute the transformation again. Do we > know how we can avoid re-execution of transformation without > cache()/persist ? > > > > Scenario2 :- > > I am writing 3.2G data to HDFS and MinIO which takes ~6mins. > > Do we have any way to make writing this faster ? > > > > I don't want to do repartition and write as repartition will have overhead > of shuffling . > > > > Please provide some inputs. > > > > > >
AWS EMR slow write to HDFS
I'm writing a large dataset in Parquet format to HDFS using Spark and it runs rather slowly in EMR vs say Databricks. I realize that if I was able to use Hadoop 3.1, it would be much more performant because it has a high performance output committer. Is this the case, and if so - when will there be a version of EMR that uses Hadoop 3.1 ? The current version I'm using is 5.21. Sent from my iPhone - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Write to HDFS
Better use coalesce instead of repatition On Fri, Oct 20, 2017 at 9:47 PM, Marco Mistroni <mmistr...@gmail.com> wrote: > Use counts.repartition(1).save.. > Hth > > > On Oct 20, 2017 3:01 PM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote: > > Actually, when I run following code, > > val textFile = sc.textFile("Sample.txt") > val counts = textFile.flatMap(line => line.split(" ")) > .map(word => (word, 1)) > .reduceByKey(_ + _) > > > It save the results into more than one partition like part-0, > part-1. I want to collect all of them into one file. > > > 2017-10-20 16:43 GMT+03:00 Marco Mistroni <mmistr...@gmail.com>: > >> Hi >> Could you just create an rdd/df out of what you want to save and store >> it in hdfs? >> Hth >> >> On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote: >> >>> Hi all, >>> >>> In word count example, >>> >>> val textFile = sc.textFile("Sample.txt") >>> val counts = textFile.flatMap(line => line.split(" ")) >>> .map(word => (word, 1)) >>> .reduceByKey(_ + _) >>> counts.saveAsTextFile("hdfs://master:8020/user/abc") >>> >>> I want to write collection of "*counts" *which is used in code above to >>> HDFS, so >>> >>> val x = counts.collect() >>> >>> Actually I want to write *x *to HDFS. But spark wants to RDD to write >>> sometihng to HDFS >>> >>> How can I write Array[(String,Int)] to HDFS >>> >>> >>> -- >>> Uğur >>> >> > > > -- > Uğur Sopaoğlu > > > -- Thanks Deepak www.bigdatabig.com www.keosha.net
Re: Write to HDFS
Use counts.repartition(1).save.. Hth On Oct 20, 2017 3:01 PM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote: Actually, when I run following code, val textFile = sc.textFile("Sample.txt") val counts = textFile.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) It save the results into more than one partition like part-0, part-1. I want to collect all of them into one file. 2017-10-20 16:43 GMT+03:00 Marco Mistroni <mmistr...@gmail.com>: > Hi > Could you just create an rdd/df out of what you want to save and store it > in hdfs? > Hth > > On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote: > >> Hi all, >> >> In word count example, >> >> val textFile = sc.textFile("Sample.txt") >> val counts = textFile.flatMap(line => line.split(" ")) >> .map(word => (word, 1)) >> .reduceByKey(_ + _) >> counts.saveAsTextFile("hdfs://master:8020/user/abc") >> >> I want to write collection of "*counts" *which is used in code above to >> HDFS, so >> >> val x = counts.collect() >> >> Actually I want to write *x *to HDFS. But spark wants to RDD to write >> sometihng to HDFS >> >> How can I write Array[(String,Int)] to HDFS >> >> >> -- >> Uğur >> > -- Uğur Sopaoğlu
Re: Write to HDFS
Actually, when I run following code, val textFile = sc.textFile("Sample.txt") val counts = textFile.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) It save the results into more than one partition like part-0, part-1. I want to collect all of them into one file. 2017-10-20 16:43 GMT+03:00 Marco Mistroni <mmistr...@gmail.com>: > Hi > Could you just create an rdd/df out of what you want to save and store it > in hdfs? > Hth > > On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote: > >> Hi all, >> >> In word count example, >> >> val textFile = sc.textFile("Sample.txt") >> val counts = textFile.flatMap(line => line.split(" ")) >> .map(word => (word, 1)) >> .reduceByKey(_ + _) >> counts.saveAsTextFile("hdfs://master:8020/user/abc") >> >> I want to write collection of "*counts" *which is used in code above to >> HDFS, so >> >> val x = counts.collect() >> >> Actually I want to write *x *to HDFS. But spark wants to RDD to write >> sometihng to HDFS >> >> How can I write Array[(String,Int)] to HDFS >> >> >> -- >> Uğur >> > -- Uğur Sopaoğlu
Re: Write to HDFS
Hi Could you just create an rdd/df out of what you want to save and store it in hdfs? Hth On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote: > Hi all, > > In word count example, > > val textFile = sc.textFile("Sample.txt") > val counts = textFile.flatMap(line => line.split(" ")) > .map(word => (word, 1)) > .reduceByKey(_ + _) > counts.saveAsTextFile("hdfs://master:8020/user/abc") > > I want to write collection of "*counts" *which is used in code above to > HDFS, so > > val x = counts.collect() > > Actually I want to write *x *to HDFS. But spark wants to RDD to write > sometihng to HDFS > > How can I write Array[(String,Int)] to HDFS > > > -- > Uğur >
Write to HDFS
Hi all, In word count example, val textFile = sc.textFile("Sample.txt") val counts = textFile.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://master:8020/user/abc") I want to write collection of "*counts" *which is used in code above to HDFS, so val x = counts.collect() Actually I want to write *x *to HDFS. But spark wants to RDD to write sometihng to HDFS How can I write Array[(String,Int)] to HDFS -- Uğur
Slow Parquet write to HDFS using Spark
I am using Spark 1.6.1 and writing to HDFS. In some cases it seems like all the work is being done by one thread. Why is that? Also, I need parquet.enable.summary-metadata to register the parquet files to Impala. Df.write().partitionBy("COLUMN").parquet(outputFileLocation); It also, seems like all of this happens in one cpu of a executor. 16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_29_0 16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3 times so far) 16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP 16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on 16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off 16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: Then again :- 16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz] 16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting. 16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0 time so far) 16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1 time so far) 16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time. 16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP 16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on 16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off 16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: The Schema About 200 of the following lines again and again 20 times or so. 16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz] 16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk. 16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651 About 200 of the following lines 16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp} Then at last:- 16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_40_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_40 16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_40_0: Committed 16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Slow-Parquet-write-to-HDFS-using-Spark-tp28011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Looking for the method executors uses to write to HDFS
Are you looking for this? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L69 On Wed, Nov 4, 2015 at 5:11 AM, Tóth Zoltán <t...@looper.hu> wrote: > Hi, > > I'd like to write a parquet file from the driver. I could use the HDFS API > but I am worried that it won't work on a secure cluster. I assume that the > method the executors use to write to HDFS takes care of managing Hadoop > security. However, I can't find the place where HDFS write happens in the > spark source. > > Please help me: > 1.How to write parquet from the driver using the Spark API? > 2. If this wouldn't possible, where can I find the method executors use to > write to HDFS? > > Thanks, > Zoltan > >
Looking for the method executors uses to write to HDFS
Hi, I'd like to write a parquet file from the driver. I could use the HDFS API but I am worried that it won't work on a secure cluster. I assume that the method the executors use to write to HDFS takes care of managing Hadoop security. However, I can't find the place where HDFS write happens in the spark source. Please help me: 1.How to write parquet from the driver using the Spark API? 2. If this wouldn't possible, where can I find the method executors use to write to HDFS? Thanks, Zoltan
Re: Removing empty partitions before we write to HDFS
Currently, I use rdd.isEmpty() Thanks, Patanachai On 08/06/2015 12:02 PM, gpatcham wrote: Is there a way to filter out empty partitions before I write to HDFS other than using reparition and colasce ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Patanachai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Removing empty partitions before we write to HDFS
Not that I'm aware of. We ran into the similar issue where we didn't want to keep accumulating all these empty part files in storage on S3 or HDFS. There didn't seem to be any performance free way to do it with an RDD, so we just run a non-spark post-batch operation to delete empty files from the write path. On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin patanac...@ipsy.com wrote: Currently, I use rdd.isEmpty() Thanks, Patanachai On 08/06/2015 12:02 PM, gpatcham wrote: Is there a way to filter out empty partitions before I write to HDFS other than using reparition and colasce ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Patanachai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
How to transform large local files into Parquet format and write into HDFS?
Hi there, I have several large files (500GB per file) to transform into Parquet format and write to HDFS. The problems I encountered can be described as follows: 1) At first, I tried to load all the records in a file and then used sc.parallelize(data) to generate RDD and finally used saveAsNewAPIHadoopFile(...) to write to HDFS. However, because each file was too large to be handled by memory (500GB), it did not work. 2) Then, I tried to load certain number of records at a time, but I had to launch a lot of saveAsNewAPIHadoopFile(...) tasks and the file directory became two levels: data/0/part0 --- part29 data/1/part0 --- part29 .. And when I tried to access the data directory to process all the parts, I did not know the directory hierarchy. I do not know if HDFS has the ability to get the hierarchy of a directory. If so, my problem can be solved by utilizing that information. Another way is to generate all the files in a flat directory, like: data/part0 part1 And then the API newAPIHadoopFile can read all of them. Any suggestions? Thanks very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-transform-large-local-files-into-Parquet-format-and-write-into-HDFS-tp12131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org