Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
I am looking for writer/comitter optimization which can make the spark
write faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com  wrote:

> Hi,
> I think you should write to HDFS then copy file (parquet or orc) from
> HDFS to MinIO.
>
> --
> eabour
>
>
> *From:* Prem Sahoo 
> *Date:* 2024-05-22 00:38
> *To:* Vibhor Gupta ; user
> 
> *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way
>
>
> On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
>
>> Hello Vibhor,
>> Thanks for the suggestion .
>> I am looking for some other alternatives where I can use the same
>> dataframe can be written to two destinations without re execution and cache
>> or persist .
>>
>> Can some one help me in scenario 2 ?
>> How to make spark write to MinIO faster ?
>> Sent from my iPhone
>>
>> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
>> wrote:
>>
>> 
>>
>> Hi Prem,
>>
>>
>>
>> You can try to write to HDFS then read from HDFS and write to MinIO.
>>
>>
>>
>> This will prevent duplicate transformation.
>>
>>
>>
>> You can also try persisting the dataframe using the DISK_ONLY level.
>>
>>
>>
>> Regards,
>>
>> Vibhor
>>
>> *From: *Prem Sahoo 
>> *Date: *Tuesday, 21 May 2024 at 8:16 AM
>> *To: *Spark dev list 
>> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>>
>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>
>> Hello Team,
>>
>> I am planning to write to two datasource at the same time .
>>
>>
>>
>> Scenario:-
>>
>>
>>
>> Writing the same dataframe to HDFS and MinIO without re-executing the
>> transformations and no cache(). Then how can we make it faster ?
>>
>>
>>
>> Read the parquet file and do a few transformations and write to HDFS and
>> MinIO.
>>
>>
>>
>> here in both write spark needs execute the transformation again. Do we
>> know how we can avoid re-execution of transformation  without
>> cache()/persist ?
>>
>>
>>
>> Scenario2 :-
>>
>> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>>
>> Do we have any way to make writing this faster ?
>>
>>
>>
>> I don't want to do repartition and write as repartition will have
>> overhead of shuffling .
>>
>>
>>
>> Please provide some inputs.
>>
>>
>>
>>
>>
>>


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread eab...@163.com
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.



eabour
 
From: Prem Sahoo
Date: 2024-05-22 00:38
To: Vibhor Gupta; user
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta  wrote:

 
Hi Prem,
 
You can try to write to HDFS then read from HDFS and write to MinIO.
 
This will prevent duplicate transformation.
 
You can also try persisting the dataframe using the DISK_ONLY level.
 
Regards,
Vibhor
From: Prem Sahoo 
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list 
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time . 
 
Scenario:-
 
Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?
 
Read the parquet file and do a few transformations and write to HDFS and MinIO.
 
here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?
 
Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?
 
I don't want to do repartition and write as repartition will have overhead of 
shuffling .
 
Please provide some inputs. 
 
 


Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:

> Hello Vibhor,
> Thanks for the suggestion .
> I am looking for some other alternatives where I can use the same
> dataframe can be written to two destinations without re execution and cache
> or persist .
>
> Can some one help me in scenario 2 ?
> How to make spark write to MinIO faster ?
> Sent from my iPhone
>
> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
> wrote:
>
> 
>
> Hi Prem,
>
>
>
> You can try to write to HDFS then read from HDFS and write to MinIO.
>
>
>
> This will prevent duplicate transformation.
>
>
>
> You can also try persisting the dataframe using the DISK_ONLY level.
>
>
>
> Regards,
>
> Vibhor
>
> *From: *Prem Sahoo 
> *Date: *Tuesday, 21 May 2024 at 8:16 AM
> *To: *Spark dev list 
> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>
> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>
> Hello Team,
>
> I am planning to write to two datasource at the same time .
>
>
>
> Scenario:-
>
>
>
> Writing the same dataframe to HDFS and MinIO without re-executing the
> transformations and no cache(). Then how can we make it faster ?
>
>
>
> Read the parquet file and do a few transformations and write to HDFS and
> MinIO.
>
>
>
> here in both write spark needs execute the transformation again. Do we
> know how we can avoid re-execution of transformation  without
> cache()/persist ?
>
>
>
> Scenario2 :-
>
> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>
> Do we have any way to make writing this faster ?
>
>
>
> I don't want to do repartition and write as repartition will have overhead
> of shuffling .
>
>
>
> Please provide some inputs.
>
>
>
>
>
>


AWS EMR slow write to HDFS

2019-06-11 Thread Femi Anthony


I'm writing a large dataset in Parquet format to HDFS using Spark and it runs 
rather slowly in EMR vs say Databricks. I realize that if I was able to use 
Hadoop 3.1, it would be much more performant because it has a high performance 
output committer. Is this the case, and if so - when will there be a version of 
EMR that uses Hadoop 3.1 ? The current version I'm using is 5.21.
Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Write to HDFS

2017-10-20 Thread Deepak Sharma
Better use coalesce instead of repatition

On Fri, Oct 20, 2017 at 9:47 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Use  counts.repartition(1).save..
> Hth
>
>
> On Oct 20, 2017 3:01 PM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:
>
> Actually, when I run following code,
>
>   val textFile = sc.textFile("Sample.txt")
>   val counts = textFile.flatMap(line => line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)
>
>
> It save the results into more than one partition like part-0,
> part-1. I want to collect all of them into one file.
>
>
> 2017-10-20 16:43 GMT+03:00 Marco Mistroni <mmistr...@gmail.com>:
>
>> Hi
>>  Could you just create an rdd/df out of what you want to save and store
>> it in hdfs?
>> Hth
>>
>> On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> In word count example,
>>>
>>>   val textFile = sc.textFile("Sample.txt")
>>>   val counts = textFile.flatMap(line => line.split(" "))
>>>  .map(word => (word, 1))
>>>  .reduceByKey(_ + _)
>>>  counts.saveAsTextFile("hdfs://master:8020/user/abc")
>>>
>>> I want to write collection of "*counts" *which is used in code above to
>>> HDFS, so
>>>
>>> val x = counts.collect()
>>>
>>> Actually I want to write *x *to HDFS. But spark wants to RDD to write
>>> sometihng to HDFS
>>>
>>> How can I write Array[(String,Int)] to HDFS
>>>
>>>
>>> --
>>> Uğur
>>>
>>
>
>
> --
> Uğur Sopaoğlu
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Write to HDFS

2017-10-20 Thread Marco Mistroni
Use  counts.repartition(1).save..
Hth

On Oct 20, 2017 3:01 PM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:

Actually, when I run following code,

  val textFile = sc.textFile("Sample.txt")
  val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)


It save the results into more than one partition like part-0,
part-1. I want to collect all of them into one file.


2017-10-20 16:43 GMT+03:00 Marco Mistroni <mmistr...@gmail.com>:

> Hi
>  Could you just create an rdd/df out of what you want to save and store it
> in hdfs?
> Hth
>
> On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:
>
>> Hi all,
>>
>> In word count example,
>>
>>   val textFile = sc.textFile("Sample.txt")
>>   val counts = textFile.flatMap(line => line.split(" "))
>>  .map(word => (word, 1))
>>  .reduceByKey(_ + _)
>>  counts.saveAsTextFile("hdfs://master:8020/user/abc")
>>
>> I want to write collection of "*counts" *which is used in code above to
>> HDFS, so
>>
>> val x = counts.collect()
>>
>> Actually I want to write *x *to HDFS. But spark wants to RDD to write
>> sometihng to HDFS
>>
>> How can I write Array[(String,Int)] to HDFS
>>
>>
>> --
>> Uğur
>>
>


-- 
Uğur Sopaoğlu


Re: Write to HDFS

2017-10-20 Thread Uğur Sopaoğlu
Actually, when I run following code,

  val textFile = sc.textFile("Sample.txt")
  val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)


It save the results into more than one partition like part-0,
part-1. I want to collect all of them into one file.


2017-10-20 16:43 GMT+03:00 Marco Mistroni <mmistr...@gmail.com>:

> Hi
>  Could you just create an rdd/df out of what you want to save and store it
> in hdfs?
> Hth
>
> On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:
>
>> Hi all,
>>
>> In word count example,
>>
>>   val textFile = sc.textFile("Sample.txt")
>>   val counts = textFile.flatMap(line => line.split(" "))
>>  .map(word => (word, 1))
>>  .reduceByKey(_ + _)
>>  counts.saveAsTextFile("hdfs://master:8020/user/abc")
>>
>> I want to write collection of "*counts" *which is used in code above to
>> HDFS, so
>>
>> val x = counts.collect()
>>
>> Actually I want to write *x *to HDFS. But spark wants to RDD to write
>> sometihng to HDFS
>>
>> How can I write Array[(String,Int)] to HDFS
>>
>>
>> --
>> Uğur
>>
>


-- 
Uğur Sopaoğlu


Re: Write to HDFS

2017-10-20 Thread Marco Mistroni
Hi
 Could you just create an rdd/df out of what you want to save and store it
in hdfs?
Hth

On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:

> Hi all,
>
> In word count example,
>
>   val textFile = sc.textFile("Sample.txt")
>   val counts = textFile.flatMap(line => line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)
>  counts.saveAsTextFile("hdfs://master:8020/user/abc")
>
> I want to write collection of "*counts" *which is used in code above to
> HDFS, so
>
> val x = counts.collect()
>
> Actually I want to write *x *to HDFS. But spark wants to RDD to write
> sometihng to HDFS
>
> How can I write Array[(String,Int)] to HDFS
>
>
> --
> Uğur
>


Write to HDFS

2017-10-20 Thread Uğur Sopaoğlu
Hi all,

In word count example,

  val textFile = sc.textFile("Sample.txt")
  val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)
 counts.saveAsTextFile("hdfs://master:8020/user/abc")

I want to write collection of "*counts" *which is used in code above to
HDFS, so

val x = counts.collect()

Actually I want to write *x *to HDFS. But spark wants to RDD to write
sometihng to HDFS

How can I write Array[(String,Int)] to HDFS


-- 
Uğur


Slow Parquet write to HDFS using Spark

2016-11-03 Thread morfious902002
I am using Spark 1.6.1 and writing to HDFS. In some cases it seems like all
the work is being done by one thread. Why is that?

Also, I need parquet.enable.summary-metadata to register the parquet files
to Impala.

Df.write().partitionBy("COLUMN").parquet(outputFileLocation);

It also, seems like all of this happens in one cpu of a executor.

16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: 
Using
user defined output committer class
org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit
output of task because needsTaskCommit=false:
attempt_201611031459_0154_m_29_0
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling 
sort
data of 41.9 GB to disk (3  times so far)
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0
non-empty blocks out of 0 blocks
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0
remote fetches in 1 ms
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: 
Using
user defined output committer class
org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to
134217728
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to
1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary 
page
size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is:
PARQUET_1_0
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet
WriteSupport with Catalyst schema:

Then again :-

16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor 
[.gz]
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: 
Maximum
partitions reached, falling back on sorting.
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling 
sort
data of 31.8 GB to disk (0  time so far)
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling 
sort
data of 31.8 GB to disk (1  time so far)
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: 
Sorting
complete. Writing out partition files one at a time.
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to
134217728
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to
1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary 
page
size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is:
PARQUET_1_0
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet
WriteSupport with Catalyst schema:

The Schema 

About 200 of the following lines again and again 20 times or so. 
 
16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor 
[.gz]
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size
135,903,551 > 134,217,728: flushing 1,040,100 records to disk.
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 89,688,651

About 200 of the following lines

16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 
413,231B
for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values,
1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED,
PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp}

Then at last:- 

16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task
'attempt_201611031521_0154_m_40_0' to
hdfs://PATH/_temporary/0/task_201611031521_0154_m_40
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil:
attempt_201611031521_0154_m_40_0: Committed
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 
154.0
(TID 8545). 3757 bytes result sent to driver



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Slow-Parquet-write-to-HDFS-using-Spark-tp28011.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Looking for the method executors uses to write to HDFS

2015-11-06 Thread Reynold Xin
Are you looking for this?

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L69


On Wed, Nov 4, 2015 at 5:11 AM, Tóth Zoltán <t...@looper.hu> wrote:

> Hi,
>
> I'd like to write a parquet file from the driver. I could use the HDFS API
> but I am worried that it won't work on a secure cluster. I assume that the
> method the executors use to write to HDFS takes care of managing Hadoop
> security. However, I can't find the place where HDFS write happens in the
> spark source.
>
> Please help me:
> 1.How to write parquet from the driver using the Spark API?
> 2. If this wouldn't possible, where can I find the method executors use to
> write to HDFS?
>
> Thanks,
> Zoltan
>
>


Looking for the method executors uses to write to HDFS

2015-11-04 Thread Tóth Zoltán
Hi,

I'd like to write a parquet file from the driver. I could use the HDFS API
but I am worried that it won't work on a secure cluster. I assume that the
method the executors use to write to HDFS takes care of managing Hadoop
security. However, I can't find the place where HDFS write happens in the
spark source.

Please help me:
1.How to write parquet from the driver using the Spark API?
2. If this wouldn't possible, where can I find the method executors use to
write to HDFS?

Thanks,
Zoltan


Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Patanachai Tangchaisin

Currently, I use rdd.isEmpty()

Thanks,
Patanachai


On 08/06/2015 12:02 PM, gpatcham wrote:

Is there a way to filter out empty partitions before I write to HDFS other
than using reparition and colasce ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Patanachai


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Richard Marscher
Not that I'm aware of. We ran into the similar issue where we didn't want
to keep accumulating all these empty part files in storage on S3 or HDFS.
There didn't seem to be any performance free way to do it with an RDD, so
we just run a non-spark post-batch operation to delete empty files from the
write path.

On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin patanac...@ipsy.com
wrote:

 Currently, I use rdd.isEmpty()

 Thanks,
 Patanachai



 On 08/06/2015 12:02 PM, gpatcham wrote:

 Is there a way to filter out empty partitions before I write to HDFS other
 than using reparition and colasce ?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 --
 Patanachai



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


How to transform large local files into Parquet format and write into HDFS?

2014-08-14 Thread Parthus
Hi there,

I have several large files (500GB per file) to transform into Parquet format
and write to HDFS. The problems I encountered can be described as follows:

1) At first, I tried to load all the records in a file and then used
sc.parallelize(data) to generate RDD and finally used
saveAsNewAPIHadoopFile(...) to write to HDFS. However, because each file
was too large to be handled by memory (500GB), it did not work.

2) Then, I tried to load certain number of records at a time, but I had to
launch a lot of saveAsNewAPIHadoopFile(...) tasks and the file directory
became two levels:

data/0/part0 --- part29
data/1/part0 --- part29
..
And when I tried to access the data directory to process all the parts, I
did not know the directory hierarchy.

I do not know if HDFS has the ability to get the hierarchy of a directory.
If so, my problem can be solved by utilizing that information. Another way
is to generate all the files in a flat directory, like:

data/part0  part1

And then the API newAPIHadoopFile can read all of them.

Any suggestions? Thanks very much.
 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-transform-large-local-files-into-Parquet-format-and-write-into-HDFS-tp12131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org