[Spark Core] saveAsTextFile is unable to rename a directory using hadoop-azure NativeAzureFileSystem

2021-09-13 Thread Abhishek Jindal
Hello,

I am trying to use the Spark rdd.saveAsTextFile function which calls the
FileSystem.rename() under the hood. This errors out with
“com.microsoft.azure.storage.StorageException: One of the request inputs is
not valid” when using hadoop-azure NativeAzureFileSystem. I have written a
small test program to rename a directory in Azure Blob Storage in Scala
that replicates this issue. Here is my code -

import java.net.URI

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.util.control.NonFatal

/**
  * A utility to test renaming a hadoop-azure path.
  */
object AzureRenameTester {

  def main(args: Array[String]): Unit = {

if (args.isEmpty) {
  throw new IllegalArgumentException("The Azure Blob storage key
must be provided!")
}

val key = args.head
val hadoopConfig = new Configuration()
hadoopConfig.set("fs.azure",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConfig.set("fs.wasbs.impl",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConfig.set("fs.AbstractFileSystem.wasbs.Impl",
"org.apache.hadoop.fs.azure.Wasbs")
hadoopConfig.set("fs.azure.account.key..blob.core.windows.net",
key)

val input = new
URI("wasbs://@.blob.core.windows.net/testing")
val inputPath = new Path(input)
val output = new
URI("wasbs://@.blob.core.windows.net/testingRenamed")
val outputPath = new Path(output)
val hadoopFs = FileSystem.get(input, hadoopConfig)

try {
  println(s"Renaming from $inputPath to $outputPath")
  hadoopFs.rename(inputPath, outputPath)
} catch {
  case NonFatal(ex) =>
println(s"${ExceptionUtils.getMessage(ex)}")
println(s"${ExceptionUtils.getRootCause(ex)}")
throw ex
}
  }
}

This code leads to the following error -

[error] Exception in thread "main"
org.apache.hadoop.fs.azure.AzureException:
com.microsoft.azure.storage.StorageException: One of the request
inputs is not valid.
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2849)
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2721)
[error] at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:460)
[error] at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:3277)
[error] at 
com.qf.util.hdfs.AzureRenameTester$.main(AzureRenameTester.scala:40)
[error] at 
com.qf.util.hdfs.AzureRenameTester.main(AzureRenameTester.scala)
[error] Caused by: com.microsoft.azure.storage.StorageException: One
of the request inputs is not valid.
[error] at 
com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
[error] at 
com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
[error] at 
com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
[error] at 
com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:735)
[error] at 
com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:691)
[error] at 
org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:434)
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2788)
[error] ... 5 more

I am currently using spark-core-3.1.1.jar with hadoop-azure-3.2.2.jar but
this same issue also occurs in hadoop-azure-3.3.1.jar as well. Please
advise how I should solve this issue.

Thanks,
Abhishek


Spark saveAsTextFile Disk Recommendation

2021-03-21 Thread ranju goel
Hi Attila,



I will check why INVALID is getting appended in mailing address.



What is your use case here?

Client Driver Application not using collect but  internally calling python
script which is reading part files records [comma separated string] of each
cluster separately and copying records in other final csv file, so merging
all part files data in single csv file. This script runs on every node and
later they all combine to single file.



*On the other hand is your data really just a collection of strings without
any repetitions*

[Ranju]:

Yes It is comma separated string.

And I just checked the 2nd argument of saveAsTextFile and I believe read
and write will be faster on disk after use of compression. I will try this.



So I think there is no special requirement on type of disk for execution of
saveAsTextFile as they are local I/O operations.



Regards

Ranju





Hi!

I would like to reflect only to the first part of your mail:

I have a large RDD dataset of around 60-70 GB which I cannot send to driver
using *collect* so first writing that to disk using  *saveAsTextFile* and
then this data gets saved in the form of multiple part files on each node
of the cluster and after that driver reads the data from that storage.


What is your use case here?

As you mention *collect()* I can assume you have to process the data
outside of Spark maybe with a 3rd party tool, isn't it?

If you have 60-70 GB of data and you write it to text file then read it
back within the same application then you still cannot call *collect()* on
it as it is still 60-70GB data, right?

On the other hand is your data really just a collection of strings without
any repetitions? I ask this because of the fileformat you are using: text
file. Even for text file at least you can pass a compression codec as the
2nd argument of *saveAsTextFile()*
<https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit>
(when
you use this link you might need to scroll up a little bit.. at least my
chrome displays the the *saveAsTextFile* method without the 2nd arg codec).
As IO is slow a compressed data could be read back quicker: as there will
be less data in the disk. Check the Snappy
<https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example.

But if there is a structure of your data and you have plan to process this
data further within Spark then please consider something way better: a columnar
storage format namely ORC or Parquet.

Best Regards,

Attila





*From:* Ranju Jain 
*Sent:* Sunday, March 21, 2021 8:10 AM
*To:* user@spark.apache.org
*Subject:* Spark saveAsTextFile Disk Recommendation



Hi All,



I have a large RDD dataset of around 60-70 GB which I cannot send to driver
using *collect* so first writing that to disk using  *saveAsTextFile* and
then this data gets saved in the form of multiple part files on each node
of the cluster and after that driver reads the data from that storage.



I have a question like *spark.local.dir* is the directory which is used as
a scratch space where mapoutputs files and RDDs might need to write by
spark for shuffle operations etc.

And there it is strongly recommended to use *local and fast disk *to avoid
any failure or performance impact.



*Do we have any such recommendation for storing multiple part files of
large dataset [ or Big RDD ] in fast disk?*

This will help me to configure the write type of disk for resulting part
files.



Regards

Ranju


RE: Spark saveAsTextFile Disk Recommendation

2021-03-21 Thread Ranju Jain
Hi Attila,

What is your use case here?
Client Driver Application not using collect but  internally calling python 
script which is reading part files records [comma separated string] of each 
cluster separately and copying records in other final csv file, so merging all 
part files data in single csv file. This script runs on every node and later 
they all combine to single file.

On the other hand is your data really just a collection of strings without any 
repetitions
[Ranju]:
Yes It is comma separated string.
And I just checked the 2nd argument of saveAsTextFile and I believe read and 
write will be faster on disk after use of compression. I will try this.

So I think there is no special requirement on type of disk for execution of 
saveAsTextFile as they are local I/O operations.

Regards
Ranju


Hi!

I would like to reflect only to the first part of your mail:


I have a large RDD dataset of around 60-70 GB which I cannot send to driver 
using collect so first writing that to disk using  saveAsTextFile and then this 
data gets saved in the form of multiple part files on each node of the cluster 
and after that driver reads the data from that storage.

What is your use case here?

As you mention collect() I can assume you have to process the data outside of 
Spark maybe with a 3rd party tool, isn't it?

If you have 60-70 GB of data and you write it to text file then read it back 
within the same application then you still cannot call collect() on it as it is 
still 60-70GB data, right?

On the other hand is your data really just a collection of strings without any 
repetitions? I ask this because of the fileformat you are using: text file. 
Even for text file at least you can pass a compression codec as the 2nd 
argument of 
saveAsTextFile()<https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit>
 (when you use this link you might need to scroll up a little bit.. at least my 
chrome displays the the saveAsTextFile method without the 2nd arg codec). As IO 
is slow a compressed data could be read back quicker: as there will be less 
data in the disk. Check the 
Snappy<https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example.

But if there is a structure of your data and you have plan to process this data 
further within Spark then please consider something way better: a columnar 
storage format namely ORC or Parquet.

Best Regards,
Attila


From: Ranju Jain 
Sent: Sunday, March 21, 2021 8:10 AM
To: user@spark.apache.org
Subject: Spark saveAsTextFile Disk Recommendation

Hi All,

I have a large RDD dataset of around 60-70 GB which I cannot send to driver 
using collect so first writing that to disk using  saveAsTextFile and then this 
data gets saved in the form of multiple part files on each node of the cluster 
and after that driver reads the data from that storage.

I have a question like spark.local.dir is the directory which is used as a 
scratch space where mapoutputs files and RDDs might need to write by spark for 
shuffle operations etc.
And there it is strongly recommended to use local and fast disk to avoid any 
failure or performance impact.

Do we have any such recommendation for storing multiple part files of large 
dataset [ or Big RDD ] in fast disk?
This will help me to configure the write type of disk for resulting part files.

Regards
Ranju


Re: Spark saveAsTextFile Disk Recommendation

2021-03-21 Thread Attila Zsolt Piros
Hi!

I would like to reflect only to the first part of your mail:

I have a large RDD dataset of around 60-70 GB which I cannot send to driver
> using *collect* so first writing that to disk using  *saveAsTextFile* and
> then this data gets saved in the form of multiple part files on each node
> of the cluster and after that driver reads the data from that storage.


What is your use case here?

As you mention *collect()* I can assume you have to process the data
outside of Spark maybe with a 3rd party tool, isn't it?

If you have 60-70 GB of data and you write it to text file then read it
back within the same application then you still cannot call *collect()* on
it as it is still 60-70GB data, right?

On the other hand is your data really just a collection of strings without
any repetitions? I ask this because of the fileformat you are using: text
file. Even for text file at least you can pass a compression codec as the
2nd argument of *saveAsTextFile()*
<https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit>
(when
you use this link you might need to scroll up a little bit.. at least my
chrome displays the the *saveAsTextFile* method without the 2nd arg codec).
As IO is slow a compressed data could be read back quicker: as there will
be less data in the disk. Check the Snappy
<https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example.

But if there is a structure of your data and you have plan to process this
data further within Spark then please consider something way better: a columnar
storage format namely ORC or Parquet.

Best Regards,
Attila


On Sun, Mar 21, 2021 at 3:40 AM Ranju Jain 
wrote:

> Hi All,
>
>
>
> I have a large RDD dataset of around 60-70 GB which I cannot send to
> driver using *collect* so first writing that to disk using
> *saveAsTextFile* and then this data gets saved in the form of multiple
> part files on each node of the cluster and after that driver reads the data
> from that storage.
>
>
>
> I have a question like *spark.local.dir* is the directory which is used
> as a scratch space where mapoutputs files and RDDs might need to write by
> spark for shuffle operations etc.
>
> And there it is strongly recommended to use *local and fast disk *to
> avoid any failure or performance impact.
>
>
>
> *Do we have any such recommendation for storing multiple part files of
> large dataset [ or Big RDD ] in fast disk?*
>
> This will help me to configure the write type of disk for resulting part
> files.
>
>
>
> Regards
>
> Ranju
>


Spark saveAsTextFile Disk Recommendation

2021-03-20 Thread Ranju Jain
Hi All,

I have a large RDD dataset of around 60-70 GB which I cannot send to driver 
using collect so first writing that to disk using  saveAsTextFile and then this 
data gets saved in the form of multiple part files on each node of the cluster 
and after that driver reads the data from that storage.

I have a question like spark.local.dir is the directory which is used as a 
scratch space where mapoutputs files and RDDs might need to write by spark for 
shuffle operations etc.
And there it is strongly recommended to use local and fast disk to avoid any 
failure or performance impact.

Do we have any such recommendation for storing multiple part files of large 
dataset [ or Big RDD ] in fast disk?
This will help me to configure the write type of disk for resulting part files.

Regards
Ranju


Re: How to improve performance of saveAsTextFile()

2017-03-11 Thread Yan Facai
How about increasing RDD's partitions / rebalancing data?

On Sat, Mar 11, 2017 at 2:33 PM, Parsian, Mahmoud 
wrote:

> How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“).
> This is taking over 30 minutes on a cluster of 10 nodes.
> Running Spark on YARN.
>
> JavaRDD has 120 million entries.
>
> Thank you,
> Best regards,
> Mahmoud
>


How to improve performance of saveAsTextFile()

2017-03-10 Thread Parsian, Mahmoud
How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“).
This is taking over 30 minutes on a cluster of 10 nodes.
Running Spark on YARN.

JavaRDD has 120 million entries.

Thank you,
Best regards,
Mahmoud


Re: saveAsTextFile at treeEnsembleModels.scala:447, took 2.513396 s Killed

2016-07-28 Thread Ascot Moss
Hi,

Thanks for your reply.

permissions (access) is not an issue in my case, it is because this issue
only happened when the bigger input file was used to generate the model,
i.e. with smaller input(s) all worked well.   It seems to me that ".save"
cannot save big file.

Q1: Any idea about the size  limit that ".save" can handle?
Q2: Any idea about how to check the size model that will be saved vis
".save" ?

Regards



On Thu, Jul 28, 2016 at 4:19 PM, Spico Florin <spicoflo...@gmail.com> wrote:

> Hi!
>   There are many reasons that your task is failed. One could be that you
> don't have proper permissions (access) to  hdfs with your user. Please
> check your user rights to write in hdfs. Please have a look also :
>
> http://stackoverflow.com/questions/27427042/spark-unable-to-save-in-hadoop-permission-denied-for-user
> I hope it jelps.
>  Florin
>
>
> On Thu, Jul 28, 2016 at 3:49 AM, Ascot Moss <ascot.m...@gmail.com> wrote:
>
>>
>> Hi,
>>
>> Please help!
>>
>> When saving the model, I got following error and cannot save the model to
>> hdfs:
>>
>> (my source code, my spark is v1.6.2)
>> my_model.save(sc, "/my_model")
>>
>> -
>> 16/07/28 08:36:19 INFO TaskSchedulerImpl: Removed TaskSet 69.0, whose
>> tasks have all completed, from pool
>>
>> 16/07/28 08:36:19 INFO DAGScheduler: ResultStage 69 (saveAsTextFile at
>> treeEnsembleModels.scala:447) finished in 0.901 s
>>
>> 16/07/28 08:36:19 INFO DAGScheduler: Job 38 finished: saveAsTextFile at
>> treeEnsembleModels.scala:447, took 2.513396 s
>>
>> Killed
>> -
>>
>>
>> Q1: Is there any limitation on saveAsTextFile?
>> Q2: or where to find the error log file location?
>>
>> Regards
>>
>>
>>
>>
>>
>


saveAsTextFile at treeEnsembleModels.scala:447, took 2.513396 s Killed

2016-07-27 Thread Ascot Moss
Hi,

Please help!

When saving the model, I got following error and cannot save the model to
hdfs:

(my source code, my spark is v1.6.2)
my_model.save(sc, "/my_model")

-
16/07/28 08:36:19 INFO TaskSchedulerImpl: Removed TaskSet 69.0, whose tasks
have all completed, from pool

16/07/28 08:36:19 INFO DAGScheduler: ResultStage 69 (saveAsTextFile at
treeEnsembleModels.scala:447) finished in 0.901 s

16/07/28 08:36:19 INFO DAGScheduler: Job 38 finished: saveAsTextFile at
treeEnsembleModels.scala:447, took 2.513396 s

Killed
-


Q1: Is there any limitation on saveAsTextFile?
Q2: or where to find the error log file location?

Regards


Re: problem about RDD map and then saveAsTextFile

2016-05-27 Thread Christian Hellström
Internally, saveAsTextFile uses saveAsHadoopFile:
https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
.

The final bit in the method first creates the output path and then saves
the data set. However, if there is an issue with the saveAsHadoopDataset
call, the path still remains. Technically, we could add an
exception-handling section that removes the path in case of problems. I
think that would be a nice way of making sure that we don’t litter the FS
with empty files and directories in case of exceptions.

So, to your question: parameter to saveAsTextFile is a path (not a file)
and it has to be empty. Spark automatically names the files PART-N with N
the partition number. This follows immediately from the partitioning scheme
of the RDD itself.

The real problem is that there is a problem with the calculation. You might
want to fix that first. Just post the relevant bits from the log.
Hi all:
 I’ve tried to execute something as below:

 result.map(transform).saveAsTextFile(hdfsAddress)

 Result is a RDD caluculated from mlilib algorithm.


I submit this to yarn, and after two attempts , the application failed.

But the exception in log is very missleading. It said  hdfsAddress already
exits.

Actually, the first attempt log showed that the exception is from the
calculation of

result. Though the attempt failed it created the file. And then attempt 2
began with

exception ‘file already exists’.


 Why was RDD calculation before already failed but also the file created?
That’s not so good I think.


problem about RDD map and then saveAsTextFile

2016-05-27 Thread Reminia Scarlet
Hi all:
 I’ve tried to execute something as below:

 result.map(transform).saveAsTextFile(hdfsAddress)

 Result is a RDD caluculated from mlilib algorithm.


I submit this to yarn, and after two attempts , the application failed.

But the exception in log is very missleading. It said  hdfsAddress already
exits.

Actually, the first attempt log showed that the exception is from the
calculation of

result. Though the attempt failed it created the file. And then attempt 2
began with

exception ‘file already exists’.


 Why was RDD calculation before already failed but also the file created?
That’s not so good I think.


RE: saveAsTextFile is not writing to local fs

2016-02-01 Thread Mohammed Guller
If the data is not too big, one option is to call the collect method and then 
save the result to a local file using standard Java/Scala API. However, keep in 
mind that this will transfer data from all the worker nodes to the driver 
program. Looks like that is what you want to do anyway, but you need to be 
aware of how big that data is and related implications.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Monday, February 1, 2016 6:00 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohamed,

Thanks for your response. Data is available in worker nodes. But looking for 
something to write directly to local fs. Seems like it is not an option.

Thanks,
Sivakumar Bhavanari.

On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller 
<moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote:
You should not be saving an RDD to local FS if Spark is running on a real 
cluster. Essentially, each Spark worker will save the partitions that it 
processes locally.

Check the directories on the worker nodes and you should find pieces of your 
file on each node.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>]
Sent: Friday, January 29, 2016 5:40 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in 
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. 
I see the below exception, but this exception occurred after saveAsTextfile 
function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
<moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote:
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>]
Sent: Friday, January 29, 201

RE: saveAsTextFile is not writing to local fs

2016-02-01 Thread Mohammed Guller
You should not be saving an RDD to local FS if Spark is running on a real 
cluster. Essentially, each Spark worker will save the partitions that it 
processes locally.

Check the directories on the worker nodes and you should find pieces of your 
file on each node.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 5:40 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in 
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. 
I see the below exception, but this exception occurred after saveAsTextfile 
function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
<moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote:
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not writing to local fs

Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs 
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing 
_SUCCESS file in the folder with no part- files and also no error or warning 
messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.



Re: saveAsTextFile is not writing to local fs

2016-02-01 Thread Siva
Hi Mohamed,

Thanks for your response. Data is available in worker nodes. But looking
for something to write directly to local fs. Seems like it is not an option.

Thanks,
Sivakumar Bhavanari.

On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller <moham...@glassbeam.com>
wrote:

> You should not be saving an RDD to local FS if Spark is running on a real
> cluster. Essentially, each Spark worker will save the partitions that it
> processes locally.
>
>
>
> Check the directories on the worker nodes and you should find pieces of
> your file on each node.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 5:40 PM
> *To:* Mohammed Guller
> *Cc:* spark users
> *Subject:* Re: saveAsTextFile is not writing to local fs
>
>
>
> Hi Mohammed,
>
>
>
> Thanks for your quick response. I m submitting spark job to Yarn in
> "yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG
> mode. I see the below exception, but this exception occurred after
> saveAsTextfile function is finished.
>
>
>
> 16/01/29 20:26:57 DEBUG HttpParser:
>
> java.net.SocketException: Socket closed
>
> at java.net.SocketInputStream.read(SocketInputStream.java:190)
>
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>
> at
> org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
>
> at
> org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
>
> at
> org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
>
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
>
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/01/29 20:26:57 DEBUG HttpParser:
>
> java.net.SocketException: Socket closed
>
> at java.net.SocketInputStream.read(SocketInputStream.java:190)
>
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>
> at
> org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
>
> at
> org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
>
> at
> org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
>
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
>
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
>
> org.spark-project.jetty.io.EofException
>
>
>
> Do you think this one this causing this?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>
>
>
> On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller <moham...@glassbeam.com>
> wrote:
>
> Is it a multi-node cluster or you running Spark on a single machine?
>
>
>
> You can change Spark’s logging level to INFO or DEBUG to see what is going
> on.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 3:38 PM
> *To:* spark users
> *Subject:* saveAsTextFile is not writing to local fs
>
>
>
> Hi Everyone,
>
>
>
> We are using spark 1.4.1 and we have a requirement of writing data local
> fs instead of hdfs.
>
>
>
> When trying to save rdd to local fs with saveAsTextFile, it is just
> writing _SUCCESS file in the folder with no part- files and also no error
> or warning messages on console.
>
>
>
> Is there any place to look at to fix this problem?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>
>
>


saveAsTextFile is not writing to local fs

2016-01-29 Thread Siva
Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing
_SUCCESS file in the folder with no part- files and also no error or
warning messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.


RE: saveAsTextFile is not writing to local fs

2016-01-29 Thread Mohammed Guller
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not writing to local fs

Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs 
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing 
_SUCCESS file in the folder with no part- files and also no error or warning 
messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.


Re: saveAsTextFile is not writing to local fs

2016-01-29 Thread Siva
Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG
mode. I see the below exception, but this exception occurred after
saveAsTextfile function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at
org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at
org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller <moham...@glassbeam.com>
wrote:

> Is it a multi-node cluster or you running Spark on a single machine?
>
>
>
> You can change Spark’s logging level to INFO or DEBUG to see what is going
> on.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 3:38 PM
> *To:* spark users
> *Subject:* saveAsTextFile is not writing to local fs
>
>
>
> Hi Everyone,
>
>
>
> We are using spark 1.4.1 and we have a requirement of writing data local
> fs instead of hdfs.
>
>
>
> When trying to save rdd to local fs with saveAsTextFile, it is just
> writing _SUCCESS file in the folder with no part- files and also no error
> or warning messages on console.
>
>
>
> Is there any place to look at to fix this problem?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Igor Berman
another option will be to try
rdd.toLocalIterator()
not sure if it will help though

I had same problem and ended up to move all parts to local disk(with Hadoop
FileSystem api) and then processing them locally


On 5 January 2016 at 22:08, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> try coalesce(1, true).
>
> On Tue, Jan 5, 2016 at 11:58 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>
>> hi I am trying to save many partitions of Dataframe into one CSV file and
>> it
>> take forever for large data sets of around 5-6 GB.
>>
>>
>> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>>
>> For small data above code works well but for large data it hangs forever
>> does not move on because of only one partitions has to shuffle data of GBs
>> please help me
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread unk1102
hi I am trying to save many partitions of Dataframe into one CSV file and it
take forever for large data sets of around 5-6 GB.

sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")

For small data above code works well but for large data it hangs forever
does not move on because of only one partitions has to shuffle data of GBs
please help me



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Alexander Pivovarov
try coalesce(1, true).

On Tue, Jan 5, 2016 at 11:58 AM, unk1102 <umesh.ka...@gmail.com> wrote:

> hi I am trying to save many partitions of Dataframe into one CSV file and
> it
> take forever for large data sets of around 5-6 GB.
>
>
> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>
> For small data above code works well but for large data it hangs forever
> does not move on because of only one partitions has to shuffle data of GBs
> please help me
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Umesh Kacha
Hi dataframe has not boolean option for coalesce it is only for RDD I
believe

sourceFrame.coalesce(1,true) //gives compilation error



On Wed, Jan 6, 2016 at 1:38 AM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> try coalesce(1, true).
>
> On Tue, Jan 5, 2016 at 11:58 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>
>> hi I am trying to save many partitions of Dataframe into one CSV file and
>> it
>> take forever for large data sets of around 5-6 GB.
>>
>>
>> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>>
>> For small data above code works well but for large data it hangs forever
>> does not move on because of only one partitions has to shuffle data of GBs
>> please help me
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Andy Davidson
Hi Unk1102

I also had trouble when I used coalesce(). Reparation() worked much better.
Keep in mind if you have a large number of portions you are probably going
have high communication costs.

Also my code works a lot better on 1.6.0. DataFrame memory was not be
spilled in 1.5.2. In 1.6.0 unpersist() actually frees up memory

Another strange thing I noticed in 1.5.1 was that I had thousands of
partitions. Many of them where empty. Have lots of empty partitions really
slowed things down

Andy

From:  unk1102 <umesh.ka...@gmail.com>
Date:  Tuesday, January 5, 2016 at 11:58 AM
To:  "user @spark" <user@spark.apache.org>
Subject:  coalesce(1).saveAsTextfile() takes forever?

> hi I am trying to save many partitions of Dataframe into one CSV file and it
> take forever for large data sets of around 5-6 GB.
> 
> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzi
> p").save("/path/hadoop")
> 
> For small data above code works well but for large data it hangs forever
> does not move on because of only one partitions has to shuffle data of GBs
> please help me
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-
> takes-forever-tp25886.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




Re: Improve saveAsTextFile performance

2015-12-05 Thread Ram VISWANADHA
>If you are doing a join/groupBy kind of operations then you need to make sure 
>the keys are evenly distributed throughout the partitions.

Yes I am doing join/groupBy operations.Can you point me to docs on how to do 
this?

Spark 1.5.2


First attempt
Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size / Records
Shuffle Spill (Memory)  Shuffle Spill (Disk)
32  rc-spark-poc-w-3.c.dailymotion-data.internal:51748  1.2 h   18  
0   18  4.4 MB / 167812 51.5 GB / 128713153.1 GB51.1 GB

Second Attempt

Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records
5   rc-spark-poc-w-1.c.dailymotion-data.internal:41061  47 min  8   
0   8   3.9 MB / 95334


Best Regards,
Ram

From: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>>
Date: Saturday, December 5, 2015 at 1:32 AM
To: Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

Which version of spark are you using? Can you look at the event timeline and 
the DAG of the job and see where its spending more time? .save simply triggers 
your entire pipeline, If you are doing a join/groupBy kind of operations then 
you need to make sure the keys are evenly distributed throughout the partitions.

Thanks
Best Regards

On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> wrote:
That didn’t work :(
Any help I have documented some steps here.
http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes

Best Regards,
Ram

From: Sahil Sareen <sareen...@gmail.com<mailto:sareen...@gmail.com>>
Date: Wednesday, December 2, 2015 at 10:18 PM
To: Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>>
Cc: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>, user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per



Re: Improve saveAsTextFile performance

2015-12-05 Thread Ram VISWANADHA
I tried partitionBy with a Hashpartitioner still the same issue
groupBy Operation: 
https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L51
Join Operation: 
https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L80

Best Regards,
Ram
--
Date: Saturday, December 5, 2015 at 7:18 AM
To: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

>If you are doing a join/groupBy kind of operations then you need to make sure 
>the keys are evenly distributed throughout the partitions.

Yes I am doing join/groupBy operations.Can you point me to docs on how to do 
this?

Spark 1.5.2


First attempt
Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size / Records
Shuffle Spill (Memory)  Shuffle Spill (Disk)
32  rc-spark-poc-w-3.c.dailymotion-data.internal:51748  1.2 h   18  
0   18  4.4 MB / 167812 51.5 GB / 128713153.1 GB51.1 GB

Second Attempt

Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records
5   rc-spark-poc-w-1.c.dailymotion-data.internal:41061  47 min  8   
0   8   3.9 MB / 95334


Best Regards,
Ram

From: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>>
Date: Saturday, December 5, 2015 at 1:32 AM
To: Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

Which version of spark are you using? Can you look at the event timeline and 
the DAG of the job and see where its spending more time? .save simply triggers 
your entire pipeline, If you are doing a join/groupBy kind of operations then 
you need to make sure the keys are evenly distributed throughout the partitions.

Thanks
Best Regards

On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> wrote:
That didn’t work :(
Any help I have documented some steps here.
http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes

Best Regards,
Ram

From: Sahil Sareen <sareen...@gmail.com<mailto:sareen...@gmail.com>>
Date: Wednesday, December 2, 2015 at 10:18 PM
To: Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>>
Cc: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>, user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per



Re: Improve saveAsTextFile performance

2015-12-05 Thread Akhil Das
Which version of spark are you using? Can you look at the event timeline
and the DAG of the job and see where its spending more time? .save simply
triggers your entire pipeline, If you are doing a join/groupBy kind of
operations then you need to make sure the keys are evenly distributed
throughout the partitions.

Thanks
Best Regards

On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA <
ram.viswana...@dailymotion.com> wrote:

> That didn’t work :(
> Any help I have documented some steps here.
>
> http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes
>
> Best Regards,
> Ram
>
> From: Sahil Sareen <sareen...@gmail.com>
> Date: Wednesday, December 2, 2015 at 10:18 PM
> To: Ram VISWANADHA <ram.viswana...@dailymotion.com>
> Cc: Ted Yu <yuzhih...@gmail.com>, user <user@spark.apache.org>
> Subject: Re: Improve saveAsTextFile performance
>
>
> http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per
>


Re: Improve saveAsTextFile performance

2015-12-04 Thread Ram VISWANADHA
That didn’t work :(
Any help I have documented some steps here.
http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes

Best Regards,
Ram

From: Sahil Sareen <sareen...@gmail.com<mailto:sareen...@gmail.com>>
Date: Wednesday, December 2, 2015 at 10:18 PM
To: Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>>
Cc: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>, user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per


Re: Improve saveAsTextFile performance

2015-12-02 Thread Sahil Sareen
PTAL:
http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per

-Sahil

On Thu, Dec 3, 2015 at 9:18 AM, Ram VISWANADHA <
ram.viswana...@dailymotion.com> wrote:

> Yes. That did not help.
>
> Best Regards,
> Ram
> From: Ted Yu <yuzhih...@gmail.com>
> Date: Wednesday, December 2, 2015 at 3:25 PM
> To: Ram VISWANADHA <ram.viswana...@dailymotion.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: Improve saveAsTextFile performance
>
> Have you tried calling coalesce() before saveAsTextFile ?
>
> Cheers
>
> On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA <
> ram.viswana...@dailymotion.com> wrote:
>
>> JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10
>> tasks, the first 9 complete in a reasonable time but the last task is
>> taking a long time to complete. The last task contains the maximum number
>> of records like 90% of the total number of records.  Is there any way to
>> parallelize the execution by increasing the number of tasks or evenly
>> distributing the number of records to different tasks?
>>
>> Thanks in advance.
>>
>> Best Regards,
>> Ram
>>
>
>


Re: Improve saveAsTextFile performance

2015-12-02 Thread Ram VISWANADHA
Yes. That did not help.

Best Regards,
Ram
From: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>
Date: Wednesday, December 2, 2015 at 3:25 PM
To: Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

Have you tried calling coalesce() before saveAsTextFile ?

Cheers

On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA 
<ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> wrote:
JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 tasks, 
the first 9 complete in a reasonable time but the last task is taking a long 
time to complete. The last task contains the maximum number of records like 90% 
of the total number of records.  Is there any way to parallelize the execution 
by increasing the number of tasks or evenly distributing the number of records 
to different tasks?

Thanks in advance.

Best Regards,
Ram



Improve saveAsTextFile performance

2015-12-02 Thread Ram VISWANADHA
JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 tasks, 
the first 9 complete in a reasonable time but the last task is taking a long 
time to complete. The last task contains the maximum number of records like 90% 
of the total number of records.  Is there any way to parallelize the execution 
by increasing the number of tasks or evenly distributing the number of records 
to different tasks?

Thanks in advance.

Best Regards,
Ram


Re: Improve saveAsTextFile performance

2015-12-02 Thread Ted Yu
Have you tried calling coalesce() before saveAsTextFile ?

Cheers

On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA <
ram.viswana...@dailymotion.com> wrote:

> JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10
> tasks, the first 9 complete in a reasonable time but the last task is
> taking a long time to complete. The last task contains the maximum number
> of records like 90% of the total number of records.  Is there any way to
> parallelize the execution by increasing the number of tasks or evenly
> distributing the number of records to different tasks?
>
> Thanks in advance.
>
> Best Regards,
> Ram
>


Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-09 Thread Andy Davidson
Thank Gerard

I¹ll give that a try. It seems like this approach is going to create a very
large number of files. I guess I could write a cron job to concatenate the
files by hour or maybe days. I imagine this is a common problem. Do you know
of something that does this already ?

I am using the stand alone cluster manager. I do not think it directly
supports cron job/table functionality. It should be easy to use the hdfs api
and linux crontab or may https://quartz-scheduler.org/

Kind regards

andy

From:  Gerard Maas <gerard.m...@gmail.com>
Date:  Sunday, November 8, 2015 at 2:13 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: streaming: missing data. does saveAsTextFile() append or
replace?

> Andy,
> 
> Using the rdd.saveAsTextFile(...)  will overwrite the data if your target is
> the same file.
> 
> If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
> suffix)  where a new file will be written at each streaming interval.
> Note that this will result in a saved file for each streaming interval. If you
> want to increase the file size (usually a good idea in HDFS), you can use a
> window function over the dstream and save the 'windowed'  dstream instead.
> 
> kind regards, Gerard.
> 
> On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Hi
>> 
>> I just started a new spark streaming project. In this phase of the system all
>> we want to do is save the data we received to hdfs. I after running for a
>> couple of days it looks like I am missing a lot of data. I wonder if
>> saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
>> in previous window? I noticed that after running for a couple of days  my
>> hdfs file system has 25 file. The names are something like ³part-6². I
>> used 'hadoop fs ­dus¹ to check the total data captured. While the system was
>> running I would periodically call Œdus¹ I was surprised sometimes the numbers
>> of total bytes actually dropped.
>> 
>> 
>> Is there a better way to save write my data to disk?
>> 
>> Any suggestions would be appreciated
>> 
>> Andy
>> 
>> 
>>public static void main(String[] args) {
>> 
>>   SparkConf conf = new SparkConf().setAppName(appName);
>> 
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> 
>> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
>> Duration(5 * 1000));
>> 
>> 
>> 
>> [ deleted code Š]
>> 
>> 
>> 
>> data.foreachRDD(new Function<JavaRDD, Void>(){
>> 
>> private static final long serialVersionUID =
>> -7957854392903581284L;
>> 
>> 
>> 
>> @Override
>> 
>> public Void call(JavaRDD jsonStr) throws Exception {
>> 
>> jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
>> /rawSteamingData is a directory
>> 
>> return null;
>> 
>> }   
>> 
>> });
>> 
>> 
>> 
>> ssc.checkpoint(checkPointUri);
>> 
>> 
>> 
>> ssc.start();
>> 
>> ssc.awaitTermination();
>> 
>> }
> 




Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-08 Thread Gerard Maas
Andy,

Using the rdd.saveAsTextFile(...)  will overwrite the data if your target
is the same file.

If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
suffix)  where a new file will be written at each streaming interval.
Note that this will result in a saved file for each streaming interval. If
you want to increase the file size (usually a good idea in HDFS), you can
use a window function over the dstream and save the 'windowed'  dstream
instead.

kind regards, Gerard.

On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I just started a new spark streaming project. In this phase of the system
> all we want to do is save the data we received to hdfs. I after running for
> a couple of days it looks like I am missing a lot of data. I wonder if
> saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I
> capture in previous window? I noticed that after running for a couple of
> days  my hdfs file system has 25 file. The names are something like 
> “part-6”. I
> used 'hadoop fs –dus’ to check the total data captured. While the system
> was running I would periodically call ‘dus’ I was surprised sometimes the
> numbers of total bytes actually dropped.
>
>
> Is there a better way to save write my data to disk?
>
> Any suggestions would be appreciated
>
> Andy
>
>
>public static void main(String[] args) {
>
>SparkConf conf = new SparkConf().setAppName(appName);
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
> Duration(5 * 1000));
>
>
> [ deleted code …]
>
>
> data.foreachRDD(new Function<JavaRDD, Void>(){
>
> private static final long serialVersionUID =
> -7957854392903581284L;
>
>
> @Override
>
> public Void call(JavaRDD jsonStr) throws Exception {
>
> jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // 
> /rawSteamingData
> is a directory
>
> return null;
>
> }
>
> });
>
>
>
> ssc.checkpoint(checkPointUri);
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> }
>


streaming: missing data. does saveAsTextFile() append or replace?

2015-11-07 Thread Andy Davidson
Hi

I just started a new spark streaming project. In this phase of the system
all we want to do is save the data we received to hdfs. I after running for
a couple of days it looks like I am missing a lot of data. I wonder if
saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
in previous window? I noticed that after running for a couple of days  my
hdfs file system has 25 file. The names are something like ³part-6². I
used 'hadoop fs ­dus¹ to check the total data captured. While the system was
running I would periodically call Œdus¹ I was surprised sometimes the
numbers of total bytes actually dropped.


Is there a better way to save write my data to disk?

Any suggestions would be appreciated

Andy


   public static void main(String[] args) {

  SparkConf conf = new SparkConf().setAppName(appName);

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(5 * 1000));



[ deleted code Š]



data.foreachRDD(new Function<JavaRDD, Void>(){

private static final long serialVersionUID =
-7957854392903581284L;



@Override

public Void call(JavaRDD jsonStr) throws Exception {

jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
/rawSteamingData is a directory

return null;

}  

});



ssc.checkpoint(checkPointUri);



ssc.start();

ssc.awaitTermination();

}




error with saveAsTextFile in local directory

2015-11-03 Thread Jack Yang
Hi all,

I am saving some hive- query results into the local directory:

val hdfsFilePath = "hdfs://master:ip/ tempFile ";
val localFilePath = "file:///home/hduser/tempFile";
hiveContext.sql(s"""my hql codes here""")
res.printSchema()  --working
res.show()   --working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath)  
--still working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath)  
--wrong!

then at last, I get the correct results in hdfsFilePath, but nothing in 
localFilePath.
Btw, the localFilePath was created, but the folder was only with a _SUCCESS 
file, no part file.

See the track: (any thougt?)

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile at 
myApp.scala:112) with 1 output partitions (allowLocal=false)
// the 112 line is the place I am using saveAsTextFile function to save the 
results locally.

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage 
42(saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 41)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List()
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42 
(MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no 
missing parents
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called with 
curMem=3889533, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as values 
in memory (estimated size 156.9 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called with 
curMem=4050165, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0 stored as 
bytes in memory (estimated size 54.8 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.135:32836 (size: 54.8 KB, free: 266.8 MB)
15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from broadcast 
at DAGScheduler.scala:874
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from 
ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0 with 1 
tasks
15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.129:54062 (size: 54.8 KB, free: 1068.8 MB)
15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 
42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1)
15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0, whose 
tasks have all completed, from pool
15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42 (saveAsTextFile 
at MyApp.scala:112) finished in 6.360 s
15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished: saveAsTextFile 
at MyApp.scala:112, took 6.588821 s
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/api,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/rdd,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/pool/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/pool,null}
15/11/04 09:57:47 INFO handler.ContextHandler: s

Re: error with saveAsTextFile in local directory

2015-11-03 Thread Ted Yu
Looks like you were running 1.4.x or earlier release because the allowLocal
flag is deprecated as of Spark 1.5.0+.

Cheers

On Tue, Nov 3, 2015 at 3:07 PM, Jack Yang <j...@uow.edu.au> wrote:

> Hi all,
>
>
>
> I am saving some hive- query results into the local directory:
>
>
>
> val hdfsFilePath = "hdfs://master:ip/ tempFile ";
>
> val localFilePath = "file:///home/hduser/tempFile";
>
> hiveContext.sql(s"""my hql codes here""")
>
> res.printSchema()  --working
>
> res.show()   --working
>
> res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath)
> --still working
>
> res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath)
> --wrong!
>
>
>
> then at last, I get the correct results in hdfsFilePath, but nothing in
> localFilePath.
>
> Btw, the localFilePath was created, but the folder was only with a
> _SUCCESS file, no part file.
>
>
>
> See the track: (any thougt?)
>
>
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile
> at myApp.scala:*112*) with 1 output partitions (allowLocal=false)
>
> *// the 112 line is the place I am using saveAsTextFile function to save
> the results locally.*
>
>
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage
> 42(saveAsTextFile at MyApp.scala:112)
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 41)
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42
> (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no
> missing parents
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called
> with curMem=3889533, maxMem=280248975
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as
> values in memory (estimated size 156.9 KB, free 263.4 MB)
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called
> with curMem=4050165, maxMem=280248975
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0
> stored as bytes in memory (estimated size 54.8 KB, free 263.4 MB)
>
> 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0
> in memory on 192.168.70.135:32836 (size: 54.8 KB, free: 266.8 MB)
>
> 15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from
> broadcast at DAGScheduler.scala:874
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
> from ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at
> MyApp.scala:112)
>
> 15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0
> with 1 tasks
>
> 15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in
> stage 42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes)
>
> 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0
> in memory on 192.168.70.129:54062 (size: 54.8 KB, free: 1068.8 MB)
>
> 15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in
> stage 42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1)
>
> 15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0,
> whose tasks have all completed, from pool
>
> 15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42
> (saveAsTextFile at MyApp.scala:112) finished in 6.360 s
>
> 15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished:
> saveAsTextFile at MyApp.scala:112, took 6.588821 s
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/api,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/json,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/envir

RE: error with saveAsTextFile in local directory

2015-11-03 Thread Jack Yang
Yes. My one is 1.4.0.

Then is this problem to do with the version?

I doubt that.  Any comments please?

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, 4 November 2015 11:52 AM
To: Jack Yang
Cc: user@spark.apache.org
Subject: Re: error with saveAsTextFile in local directory

Looks like you were running 1.4.x or earlier release because the allowLocal 
flag is deprecated as of Spark 1.5.0+.

Cheers

On Tue, Nov 3, 2015 at 3:07 PM, Jack Yang 
<j...@uow.edu.au<mailto:j...@uow.edu.au>> wrote:
Hi all,

I am saving some hive- query results into the local directory:

val hdfsFilePath = "hdfs://master:ip/ tempFile ";
val localFilePath = 
"file:///home/hduser/tempFile";
hiveContext.sql(s"""my hql codes here""")
res.printSchema()  --working
res.show()   --working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath)  
--still working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath)  
--wrong!

then at last, I get the correct results in hdfsFilePath, but nothing in 
localFilePath.
Btw, the localFilePath was created, but the folder was only with a _SUCCESS 
file, no part file.

See the track: (any thougt?)

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile at 
myApp.scala:112) with 1 output partitions (allowLocal=false)
// the 112 line is the place I am using saveAsTextFile function to save the 
results locally.

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage 
42(saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 41)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List()
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42 
(MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no 
missing parents
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called with 
curMem=3889533, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as values 
in memory (estimated size 156.9 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called with 
curMem=4050165, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0 stored as 
bytes in memory (estimated size 54.8 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.135:32836<http://192.168.70.135:32836> (size: 54.8 KB, 
free: 266.8 MB)
15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from broadcast 
at DAGScheduler.scala:874
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from 
ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0 with 1 
tasks
15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.129:54062<http://192.168.70.129:54062> (size: 54.8 KB, 
free: 1068.8 MB)
15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 
42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1)
15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0, whose 
tasks have all completed, from pool
15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42 (saveAsTextFile 
at MyApp.scala:112) finished in 6.360 s
15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished: saveAsTextFile 
at MyApp.scala:112, took 6.588821 s
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/api,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandle

Re: saveAsTextFile creates an empty folder in HDFS

2015-10-03 Thread Ted Yu
bq.  val dist = sc.parallelize(l)

Following the above, can you call, e.g. count() on dist before saving ?

Cheers

On Fri, Oct 2, 2015 at 1:21 AM, jarias <ja...@elrocin.es> wrote:

> Dear list,
>
> I'm experimenting a problem when trying to write any RDD to HDFS. I've
> tried
> with minimal examples, scala programs and pyspark programs both in local
> and
> cluster modes and as standalone applications or shells.
>
> My problem is that when invoking the write command, a task is executed but
> it just creates an empty folder in the given HDFS path. I'm lost at this
> point because there is no sign of error or warning in the spark logs.
>
> I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
> working properly when using the command tools or running MapReduce jobs.
>
>
> Thank you for your time, I'm not sure if this is just a rookie mistake or
> an
> overall config problem.
>
> Just a working example:
>
> This sequence produces the following log and creates the empty folder
> "test":
>
> scala> val l = Seq.fill(1)(nextInt)
> scala> val dist = sc.parallelize(l)
> scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/")
>
>
> 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm
> version is 1
> 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
> :27
> 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
> :27) with 2 output partitions (allowLocal=false)
> 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at
> :27)
> 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3
> (MapPartitionsRDD[7]
> at saveAsTextFile at :27), which has no missing parents
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
> curMem=184615, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
> memory (estimated size 134.1 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
> curMem=321951, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as
> bytes
> in memory (estimated size 46.6 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB)
> 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
> broadcast_3_piece0
> 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
> DAGScheduler.scala:839
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 3
> (MapPartitionsRDD[7] at saveAsTextFile at :27)
> 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
> 6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes)
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
> 7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB)
> 15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
> 6) in 312 ms on nodo2.i3a.info (1/2)
> 15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
> 7) in 313 ms on nodo3.i3a.info (2/2)
> 15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
> all completed, from pool
> 15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at
> :27) finished in 0.334 s
> 15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at
> :27, took 0.436388 s
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-creates-an-empty-folder-in-HDFS-tp24906.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: saveAsTextFile creates an empty folder in HDFS

2015-10-03 Thread Jacinto Arias
Yes printing the result with collect or take is working,

actually this is a minimal example, but also when working with real data the 
actions are performed, and the resulting RDDs can be printed out without 
problem. The data is there and the operations are correct, they just cannot be 
written to a file.


> On 03 Oct 2015, at 16:17, Ted Yu <yuzhih...@gmail.com 
> <mailto:yuzhih...@gmail.com>> wrote:
> 
> bq.  val dist = sc.parallelize(l)
> 
> Following the above, can you call, e.g. count() on dist before saving ?
> 
> Cheers
> 
> On Fri, Oct 2, 2015 at 1:21 AM, jarias <ja...@elrocin.es 
> <mailto:ja...@elrocin.es>> wrote:
> Dear list,
> 
> I'm experimenting a problem when trying to write any RDD to HDFS. I've tried
> with minimal examples, scala programs and pyspark programs both in local and
> cluster modes and as standalone applications or shells.
> 
> My problem is that when invoking the write command, a task is executed but
> it just creates an empty folder in the given HDFS path. I'm lost at this
> point because there is no sign of error or warning in the spark logs.
> 
> I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
> working properly when using the command tools or running MapReduce jobs.
> 
> 
> Thank you for your time, I'm not sure if this is just a rookie mistake or an
> overall config problem.
> 
> Just a working example:
> 
> This sequence produces the following log and creates the empty folder
> "test":
> 
> scala> val l = Seq.fill(1)(nextInt)
> scala> val dist = sc.parallelize(l)
> scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/ 
> <http://node1.i3a.info/user/jarias/test/>")
> 
> 
> 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm
> version is 1
> 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
> :27
> 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
> :27) with 2 output partitions (allowLocal=false)
> 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at
> :27)
> 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[7]
> at saveAsTextFile at :27), which has no missing parents
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
> curMem=184615, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
> memory (estimated size 134.1 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
> curMem=321951, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes
> in memory (estimated size 46.6 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo1.i3a.info:36330 <http://nodo1.i3a.info:36330/> (size: 46.6 KB, free: 
> 265.3 MB)
> 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
> broadcast_3_piece0
> 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
> DAGScheduler.scala:839
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3
> (MapPartitionsRDD[7] at saveAsTextFile at :27)
> 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
> 6, nodo2.i3a.info <http://nodo2.i3a.info/>, PROCESS_LOCAL, 25975 bytes)
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
> 7, nodo3.i3a.info <http://nodo3.i3a.info/>, PROCESS_LOCAL, 25963 bytes)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo2.i3a.info:37759 <http://nodo2.i3a.info:37759/> (size: 46.6 KB, free: 
> 530.2 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo3.i3a.info:54798 <http://nodo3.i3a.info:54798/> (size: 46.6 KB, free: 
> 530.2 MB)
> 15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
> 6) in 312 ms on nodo2.i3a.info <http://nodo2.i3a.info/> (1/2)
> 15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
> 7) in 313 ms on nodo3.i3a.info <http://nodo3.i3a.info/> (2/2)
> 15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
> all completed, from pool
> 15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at
> :27) finished in 0.334 s
> 15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at
> :27, took 0.436388 s
> 
> 

Re: saveAsTextFile creates an empty folder in HDFS

2015-10-03 Thread Ajay Chander
Hi Jacin,

If I was you, first thing that I would do is, write a sample java
application to write data into hdfs and see if it's working fine. Meta data
is being created in hdfs, that means, communication to namenode is working
fine but not to datanodes since you don't see any data inside the file. Why
don't you see hdfs logs and see what's happening when your application is
talking to namenode? I suspect some networking issue or check if the
datanodes are running fine.

Thank you,
Ajay

On Saturday, October 3, 2015, Jacinto Arias <ja...@elrocin.es> wrote:

> Yes printing the result with collect or take is working,
>
> actually this is a minimal example, but also when working with real data
> the actions are performed, and the resulting RDDs can be printed out
> without problem. The data is there and the operations are correct, they
> just cannot be written to a file.
>
>
> On 03 Oct 2015, at 16:17, Ted Yu <yuzhih...@gmail.com
> <javascript:_e(%7B%7D,'cvml','yuzhih...@gmail.com');>> wrote:
>
> bq.  val dist = sc.parallelize(l)
>
> Following the above, can you call, e.g. count() on dist before saving ?
>
> Cheers
>
> On Fri, Oct 2, 2015 at 1:21 AM, jarias <ja...@elrocin.es
> <javascript:_e(%7B%7D,'cvml','ja...@elrocin.es');>> wrote:
>
>> Dear list,
>>
>> I'm experimenting a problem when trying to write any RDD to HDFS. I've
>> tried
>> with minimal examples, scala programs and pyspark programs both in local
>> and
>> cluster modes and as standalone applications or shells.
>>
>> My problem is that when invoking the write command, a task is executed but
>> it just creates an empty folder in the given HDFS path. I'm lost at this
>> point because there is no sign of error or warning in the spark logs.
>>
>> I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
>> working properly when using the command tools or running MapReduce jobs.
>>
>>
>> Thank you for your time, I'm not sure if this is just a rookie mistake or
>> an
>> overall config problem.
>>
>> Just a working example:
>>
>> This sequence produces the following log and creates the empty folder
>> "test":
>>
>> scala> val l = Seq.fill(1)(nextInt)
>> scala> val dist = sc.parallelize(l)
>> scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/")
>>
>>
>> 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer
>> Algorithm
>> version is 1
>> 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
>> :27
>> 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
>> :27) with 2 output partitions (allowLocal=false)
>> 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile
>> at
>> :27)
>> 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
>> 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
>> 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3
>> (MapPartitionsRDD[7]
>> at saveAsTextFile at :27), which has no missing parents
>> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
>> curMem=184615, maxMem=278302556
>> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
>> memory (estimated size 134.1 KB, free 265.1 MB)
>> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
>> curMem=321951, maxMem=278302556
>> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as
>> bytes
>> in memory (estimated size 46.6 KB, free 265.1 MB)
>> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in
>> memory
>> on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB)
>> 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
>> broadcast_3_piece0
>> 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
>> DAGScheduler.scala:839
>> 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from
>> Stage 3
>> (MapPartitionsRDD[7] at saveAsTextFile at :27)
>> 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
>> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
>> 6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes)
>> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
>> 7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes)
>> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in
>> memory
>> on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB)
>> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 i

saveAsTextFile creates an empty folder in HDFS

2015-10-02 Thread jarias
Dear list,

I'm experimenting a problem when trying to write any RDD to HDFS. I've tried
with minimal examples, scala programs and pyspark programs both in local and
cluster modes and as standalone applications or shells.

My problem is that when invoking the write command, a task is executed but
it just creates an empty folder in the given HDFS path. I'm lost at this
point because there is no sign of error or warning in the spark logs.

I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
working properly when using the command tools or running MapReduce jobs.


Thank you for your time, I'm not sure if this is just a rookie mistake or an
overall config problem.

Just a working example:

This sequence produces the following log and creates the empty folder
"test":

scala> val l = Seq.fill(1)(nextInt)
scala> val dist = sc.parallelize(l)
scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/")


15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm
version is 1
15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
:27
15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
:27) with 2 output partitions (allowLocal=false)
15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at
:27)
15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[7]
at saveAsTextFile at :27), which has no missing parents
15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
curMem=184615, maxMem=278302556
15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 134.1 KB, free 265.1 MB)
15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
curMem=321951, maxMem=278302556
15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes
in memory (estimated size 46.6 KB, free 265.1 MB)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB)
15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
broadcast_3_piece0
15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:839
15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3
(MapPartitionsRDD[7] at saveAsTextFile at :27)
15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes)
15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB)
15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
6) in 312 ms on nodo2.i3a.info (1/2)
15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
7) in 313 ms on nodo3.i3a.info (2/2)
15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
all completed, from pool 
15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at
:27) finished in 0.334 s
15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at
:27, took 0.436388 s




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-creates-an-empty-folder-in-HDFS-tp24906.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Exception during SaveAstextFile Stage

2015-09-24 Thread Chirag Dewan
Hi,

I have 2 stages in my job map and save as text file. During the save text file 
stage I am getting an exception :

15/09/24 15:38:16 WARN AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)

It might be too early to ask this since I haven't digged at all into why it is 
coming, any one has any idea about this?

Thanks in advance,

Chirag


RE: spark 1.4.1 saveAsTextFile (and Parquet) is slow on emr-4.0.0

2015-09-03 Thread Ewan Leith
For those who have similar issues on EMR writing Parquet files, if you update 
mapred-site.xml with the following lines:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
parquet.enable.summary-metadatafalse
spark.sql.parquet.output.committer.classorg.apache.spark.sql.parquet.DirectParquetOutputCommitter
 

Then you get Parquet files writing direct to S3 without use of temporary files 
too, and the disabled summary-metadata files which can cause a performance hit 
with writing large Parquet datasets on S3

The easiest way to add them across the cluster is via the –configurations flag 
on the “aws emr create-cluster” command

Thanks,
Ewan


From: Alexander Pivovarov [mailto:apivova...@gmail.com]
Sent: 03 September 2015 00:12
To: Neil Jonkers <neilod...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

Hi Neil

Yes! it helps!!! I do  not see _temporary in console output anymore.  
saveAsTextFile is fast now.
2015-09-02 23:07:00,022 INFO  [task-result-getter-0] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 18.0 in stage 0.0 (TID 18) in 4398 
ms on ip-10-0-24-103.ec2.internal (1/24)
2015-09-02 23:07:01,887 INFO  [task-result-getter-2] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 5.0 in stage 0.0 (TID 5) in 6282 ms 
on ip-10-0-26-14.ec2.internal (24/24)
2015-09-02 23:07:01,888 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 0 (saveAsTextFile at :22) 
finished in 6.319 s
2015-09-02 23:07:02,123 INFO  [main] s3n.Jets3tNativeFileSystemStore 
(Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar 
tmp/test40_141_24_406/_SUCCESS 0

Thank you!

On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers 
<neilod...@gmail.com<mailto:neilod...@gmail.com>> wrote:
Hi,
Can you set the following parameters in your mapred-site.xml file please:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
You can also config this at cluster launch time with the following 
Classification via EMR console:

classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]


Thank you

On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
<apivova...@gmail.com<mailto:apivova...@gmail.com>> wrote:
I checked previous emr config (emr-3.8)
mapred-site.xml has the following setting
 
mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
 


On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov 
<apivova...@gmail.com<mailto:apivova...@gmail.com>> wrote:
Should I use DirectOutputCommitter?
spark.hadoop.mapred.output.committer.class  
com.appsflyer.spark.DirectOutputCommitter



On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov 
<apivova...@gmail.com<mailto:apivova...@gmail.com>> wrote:
I run spark 1.4.1 in amazom aws emr 4.0.0

For some reason spark saveAsTextFile is very slow on emr 4.0.0 in comparison to 
emr 3.8  (was 5 sec, now 95 sec)

Actually saveAsTextFile says that it's done in 4.356 sec but after that I see 
lots of INFO messages with 404 error from com.amazonaws.latency logger for next 
90 sec

spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + 
"A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")

2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 5 (saveAsTextFile at :22) 
finished in 4.356 s
2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler 
(Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all 
completed, from pool
2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at :22, 
took 4.547829 s
2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem 
(S3NativeFileSystem.java:listStatus(896)) - listStatus 
s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], 
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found 
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 
3B2F06FD11682D22), S3 Extended Request ID: 
C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], 
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], 
AWSRequestID=[3B2F06FD11682D22], 
ServiceEndpoint=[https://foo-bar.s3.amazonaws.com], Exception=1, 
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, 
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], 
HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], 
RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency 
(AWSRequestMe

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-02 Thread Neil Jonkers
Hi,

Can you set the following parameters in your mapred-site.xml file please:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue

You can also config this at cluster launch time with the following
Classification via EMR console:

classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]


Thank you

On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> I checked previous emr config (emr-3.8)
> mapred-site.xml has the following setting
> 
> mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
> 
>
>
> On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov <apivova...@gmail.com>
> wrote:
>
>> Should I use DirectOutputCommitter?
>> spark.hadoop.mapred.output.committer.class
>>  com.appsflyer.spark.DirectOutputCommitter
>>
>>
>>
>> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <apivova...@gmail.com
>> > wrote:
>>
>>> I run spark 1.4.1 in amazom aws emr 4.0.0
>>>
>>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
>>> comparison to emr 3.8  (was 5 sec, now 95 sec)
>>>
>>> Actually saveAsTextFile says that it's done in 4.356 sec but after that
>>> I see lots of INFO messages with 404 error from com.amazonaws.latency
>>> logger for next 90 sec
>>>
>>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
>>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>>>
>>> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
>>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
>>> (saveAsTextFile at :22) finished in 4.356 s
>>> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
>>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
>>> whose tasks have all completed, from pool
>>> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
>>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
>>> :22, took 4.547829 s
>>> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
>>> (S3NativeFileSystem.java:listStatus(896)) - listStatus
>>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
>>> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>> ID: 3B2F06FD11682D22), S3 Extended Request ID:
>>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
>>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
>>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
>>> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>>> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>>> RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
>>> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
>>> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
>>> HttpClientSendRequestTime=[0.089],
>>> 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>> ID: 62C6B413965447FD), S3 Extended Request ID:
>>> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
>>> HttpRequestTime=[10.543],

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-02 Thread Alexander Pivovarov
Hi Neil

Yes! it helps!!! I do  not see _temporary in console output anymore.
saveAsTextFile
is fast now.

2015-09-02 23:07:00,022 INFO  [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 18.0
in stage 0.0 (TID 18) in 4398 ms on ip-10-0-24-103.ec2.internal (1/24)

2015-09-02 23:07:01,887 INFO  [task-result-getter-2]
scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 5.0 in
stage 0.0 (TID 5) in 6282 ms on ip-10-0-26-14.ec2.internal (24/24)

2015-09-02 23:07:01,888 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 0
(saveAsTextFile at :22) finished in 6.319 s

2015-09-02 23:07:02,123 INFO  [main] s3n.Jets3tNativeFileSystemStore
(Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar
tmp/test40_141_24_406/_SUCCESS 0


Thank you!

On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers <neilod...@gmail.com> wrote:

> Hi,
>
> Can you set the following parameters in your mapred-site.xml file please:
>
>
> mapred.output.direct.EmrFileSystemtrue
>
> mapred.output.direct.NativeS3FileSystemtrue
>
> You can also config this at cluster launch time with the following
> Classification via EMR console:
>
>
> classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]
>
>
> Thank you
>
> On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov <apivova...@gmail.com>
> wrote:
>
>> I checked previous emr config (emr-3.8)
>> mapred-site.xml has the following setting
>> 
>> mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
>> 
>>
>>
>> On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov <apivova...@gmail.com
>> > wrote:
>>
>>> Should I use DirectOutputCommitter?
>>> spark.hadoop.mapred.output.committer.class
>>>  com.appsflyer.spark.DirectOutputCommitter
>>>
>>>
>>>
>>> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <
>>> apivova...@gmail.com> wrote:
>>>
>>>> I run spark 1.4.1 in amazom aws emr 4.0.0
>>>>
>>>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
>>>> comparison to emr 3.8  (was 5 sec, now 95 sec)
>>>>
>>>> Actually saveAsTextFile says that it's done in 4.356 sec but after that
>>>> I see lots of INFO messages with 404 error from com.amazonaws.latency
>>>> logger for next 90 sec
>>>>
>>>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
>>>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>>>>
>>>> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
>>>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
>>>> (saveAsTextFile at :22) finished in 4.356 s
>>>> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
>>>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
>>>> whose tasks have all completed, from pool
>>>> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
>>>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
>>>> :22, took 4.547829 s
>>>> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
>>>> (S3NativeFileSystem.java:listStatus(896)) - listStatus
>>>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
>>>> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
>>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>>> ID: 3B2F06FD11682D22), S3 Extended Request ID:
>>>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
>>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
>>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
>>>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
>>>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
>>>> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
>>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>>>> ServiceName=[Amazon S3], AWS

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-01 Thread Alexander Pivovarov
Should I use DirectOutputCommitter?
spark.hadoop.mapred.output.committer.class
 com.appsflyer.spark.DirectOutputCommitter



On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> I run spark 1.4.1 in amazom aws emr 4.0.0
>
> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
> comparison to emr 3.8  (was 5 sec, now 95 sec)
>
> Actually saveAsTextFile says that it's done in 4.356 sec but after that I
> see lots of INFO messages with 404 error from com.amazonaws.latency logger
> for next 90 sec
>
> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>
> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
> (saveAsTextFile at :22) finished in 4.356 s
> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler
> (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all
> completed, from pool
> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
> :22, took 4.547829 s
> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
> (S3NativeFileSystem.java:listStatus(896)) - listStatus
> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
> ID: 3B2F06FD11682D22), S3 Extended Request ID:
> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], Exception=1,
> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
> RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
> HttpClientSendRequestTime=[0.089],
> 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
> ID: 62C6B413965447FD), S3 Extended Request ID:
> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], Exception=1,
> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
> HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
> RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138],
> 2015-09-01 21:16:17,774 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
> ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
> RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724],
> HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728],
> RequestSigningTime=[0.148], ResponseProcessingTime=[0.155],
> HttpClientSendRequestTime=[0.068],
> 2015-09-01 21:16:17,786 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
> ID: 4846575A1C373BB9), S3 Extended Request ID:
> aw/MMKxKPmuDuxTj4GKyDbp8hgpQbTjipJBzdjdTgbwPgt5NsZS4z+tRf2bk3I2E],
> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
> AWSRequestID=[4846575A1C373BB9], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], Exception=1,
> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClie

spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-01 Thread Alexander Pivovarov
I run spark 1.4.1 in amazom aws emr 4.0.0

For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
comparison to emr 3.8  (was 5 sec, now 95 sec)

Actually saveAsTextFile says that it's done in 4.356 sec but after that I
see lots of INFO messages with 404 error from com.amazonaws.latency logger
for next 90 sec

spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
"A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")

2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
(saveAsTextFile at :22) finished in 4.356 s
2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all
completed, from pool
2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
:22, took 4.547829 s
2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
(S3NativeFileSystem.java:listStatus(896)) - listStatus
s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
ID: 3B2F06FD11682D22), S3 Extended Request ID:
C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], Exception=1,
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
HttpClientSendRequestTime=[0.089],
2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
ID: 62C6B413965447FD), S3 Extended Request ID:
4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], Exception=1,
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138],
2015-09-01 21:16:17,774 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724],
HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728],
RequestSigningTime=[0.148], ResponseProcessingTime=[0.155],
HttpClientSendRequestTime=[0.068],
2015-09-01 21:16:17,786 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
ID: 4846575A1C373BB9), S3 Extended Request ID:
aw/MMKxKPmuDuxTj4GKyDbp8hgpQbTjipJBzdjdTgbwPgt5NsZS4z+tRf2bk3I2E],
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
AWSRequestID=[4846575A1C373BB9], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], Exception=1,
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.531],
HttpRequestTime=[11.134], HttpClientReceiveResponseTime=[9.434],
RequestSigningTime=[0.206], HttpClientSendRequestTime=[0.13],
2015-09-01 21:16:17,786 INFO  [main] s3n.S3NativeFileSystem
(S3NativeFileSystem.java:listStatus(896)) - listStatus
s3n://foo-bar/tmp/test40_20/_temporary/0/task_201509012116_0005_m_00
with recursive false
2015-09-01 21:16:17,798 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-01 Thread Alexander Pivovarov
I checked previous emr config (emr-3.8)
mapred-site.xml has the following setting

mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter



On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> Should I use DirectOutputCommitter?
> spark.hadoop.mapred.output.committer.class
>  com.appsflyer.spark.DirectOutputCommitter
>
>
>
> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <apivova...@gmail.com>
> wrote:
>
>> I run spark 1.4.1 in amazom aws emr 4.0.0
>>
>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
>> comparison to emr 3.8  (was 5 sec, now 95 sec)
>>
>> Actually saveAsTextFile says that it's done in 4.356 sec but after that I
>> see lots of INFO messages with 404 error from com.amazonaws.latency logger
>> for next 90 sec
>>
>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>>
>> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
>> (saveAsTextFile at :22) finished in 4.356 s
>> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
>> whose tasks have all completed, from pool
>> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
>> :22, took 4.547829 s
>> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
>> (S3NativeFileSystem.java:listStatus(896)) - listStatus
>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
>> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>> ID: 3B2F06FD11682D22), S3 Extended Request ID:
>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], Exception=1,
>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
>> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>> RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
>> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
>> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
>> HttpClientSendRequestTime=[0.089],
>> 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>> ID: 62C6B413965447FD), S3 Extended Request ID:
>> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], Exception=1,
>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
>> HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
>> RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138],
>> 2015-09-01 21:16:17,774 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>> ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>> RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724],
>> HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728],
>> RequestSigningTime=[0.148], ResponseProcessingTime=[0.155],
>> HttpClientSendRequestTime=[0.068],
>> 2015-09-01 21:16:17,786 INFO  [main] amazonaws.latency
>> (AW

Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Dean Wampler
Following Hadoop conventions, Spark won't overwrite an existing directory.
You need to provide a unique output path every time you run the program, or
delete or rename the target directory before you run the job.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit
 at local there is no problem , but i run at cluster, saveAsTextFile doesn't
 work.*It says me User class threw exception: Output directory
 hdfs://172.31.42.10:54310/./weblogReadResult
 http://172.31.42.10:54310/./weblogReadResult already exists*

 Is there anyone can help me about this issue ?

 Best,
 yasemin



 --
 hiç ender hiç



Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Yasemin Kaya
Thanx Dean, i am giving unique output path and in every time i also delete
the directory before i run the job.

2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com:

 Following Hadoop conventions, Spark won't overwrite an existing directory.
 You need to provide a unique output path every time you run the program, or
 delete or rename the target directory before you run the job.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit
 at local there is no problem , but i run at cluster, saveAsTextFile doesn't
 work.*It says me User class threw exception: Output directory
 hdfs://172.31.42.10:54310/./weblogReadResult
 http://172.31.42.10:54310/./weblogReadResult already exists*

 Is there anyone can help me about this issue ?

 Best,
 yasemin



 --
 hiç ender hiç





-- 
hiç ender hiç


Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Dean Wampler
So, just before running the job, if you run the HDFS command at a shell
prompt: hdfs dfs -ls hdfs://172.31.42.10:54310/./weblogReadResult.
Does it say the path doesn't exist?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 7:58 AM, Yasemin Kaya godo...@gmail.com wrote:

 Thanx Dean, i am giving unique output path and in every time i also delete
 the directory before i run the job.

 2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com:

 Following Hadoop conventions, Spark won't overwrite an existing
 directory. You need to provide a unique output path every time you run the
 program, or delete or rename the target directory before you run the job.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i
 submit at local there is no problem , but i run at cluster, saveAsTextFile
 doesn't work.*It says me User class threw exception: Output directory
 hdfs://172.31.42.10:54310/./weblogReadResult
 http://172.31.42.10:54310/./weblogReadResult already exists*

 Is there anyone can help me about this issue ?

 Best,
 yasemin



 --
 hiç ender hiç





 --
 hiç ender hiç



EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Yasemin Kaya
Hi,

I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit
at local there is no problem , but i run at cluster, saveAsTextFile doesn't
work.*It says me User class threw exception: Output directory
hdfs://172.31.42.10:54310/./weblogReadResult
http://172.31.42.10:54310/./weblogReadResult already exists*

Is there anyone can help me about this issue ?

Best,
yasemin



-- 
hiç ender hiç


Re: Combining Spark Files with saveAsTextFile

2015-08-06 Thread MEETHU MATHEW
Hi,Try using coalesce(1) before calling saveAsTextFile() Thanks  Regards, 
Meethu M 


 On Wednesday, 5 August 2015 7:53 AM, Brandon White 
bwwintheho...@gmail.com wrote:
   

 What is the best way to make saveAsTextFile save as only a single file?

  

Re: Combining Spark Files with saveAsTextFile

2015-08-05 Thread Igor Berman
using coalesce might be dangerous, since 1 worker process will need to
handle whole file and if the file is huge you'll get OOM, however it
depends on implementation, I'm not sure how it will be done
nevertheless, worse to try the coallesce method(please post your results)

another option would be to use FileUtil.copyMerge which copies each
partition one after another into destination stream(file); so as soon as
you've written your hdfs file with spark with multiple partitions in
parallel(as usual), you can then make another step to merge it into any
destination you want

On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote:

 Just to further clarify, you can first call coalesce with argument 1 and
 then call saveAsTextFile. For example,



 rdd.coalesce(1).saveAsTextFile(...)







 Mohammed



 *From:* Mohammed Guller
 *Sent:* Tuesday, August 4, 2015 9:39 PM
 *To:* 'Brandon White'; user
 *Subject:* RE: Combining Spark Files with saveAsTextFile



 One options is to use the coalesce method in the RDD class.



 Mohammed



 *From:* Brandon White [mailto:bwwintheho...@gmail.com
 bwwintheho...@gmail.com]
 *Sent:* Tuesday, August 4, 2015 7:23 PM
 *To:* user
 *Subject:* Combining Spark Files with saveAsTextFile



 What is the best way to make saveAsTextFile save as only a single file?



Re: Combining Spark Files with saveAsTextFile

2015-08-05 Thread Igor Berman
seems that coallesce do work, see following thread
https://www.mail-archive.com/user%40spark.apache.org/msg00928.html

On 5 August 2015 at 09:47, Igor Berman igor.ber...@gmail.com wrote:

 using coalesce might be dangerous, since 1 worker process will need to
 handle whole file and if the file is huge you'll get OOM, however it
 depends on implementation, I'm not sure how it will be done
 nevertheless, worse to try the coallesce method(please post your results)

 another option would be to use FileUtil.copyMerge which copies each
 partition one after another into destination stream(file); so as soon as
 you've written your hdfs file with spark with multiple partitions in
 parallel(as usual), you can then make another step to merge it into any
 destination you want

 On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote:

 Just to further clarify, you can first call coalesce with argument 1 and
 then call saveAsTextFile. For example,



 rdd.coalesce(1).saveAsTextFile(...)







 Mohammed



 *From:* Mohammed Guller
 *Sent:* Tuesday, August 4, 2015 9:39 PM
 *To:* 'Brandon White'; user
 *Subject:* RE: Combining Spark Files with saveAsTextFile



 One options is to use the coalesce method in the RDD class.



 Mohammed



 *From:* Brandon White [mailto:bwwintheho...@gmail.com
 bwwintheho...@gmail.com]
 *Sent:* Tuesday, August 4, 2015 7:23 PM
 *To:* user
 *Subject:* Combining Spark Files with saveAsTextFile



 What is the best way to make saveAsTextFile save as only a single file?





RE: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Mohammed Guller
One options is to use the coalesce method in the RDD class.

Mohammed

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Tuesday, August 4, 2015 7:23 PM
To: user
Subject: Combining Spark Files with saveAsTextFile


What is the best way to make saveAsTextFile save as only a single file?


RE: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Mohammed Guller
Just to further clarify, you can first call coalesce with argument 1 and then 
call saveAsTextFile. For example,

rdd.coalesce(1).saveAsTextFile(...)



Mohammed

From: Mohammed Guller
Sent: Tuesday, August 4, 2015 9:39 PM
To: 'Brandon White'; user
Subject: RE: Combining Spark Files with saveAsTextFile

One options is to use the coalesce method in the RDD class.

Mohammed

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Tuesday, August 4, 2015 7:23 PM
To: user
Subject: Combining Spark Files with saveAsTextFile


What is the best way to make saveAsTextFile save as only a single file?


Combining Spark Files with saveAsTextFile

2015-08-04 Thread Brandon White
What is the best way to make saveAsTextFile save as only a single file?


spark cache issue while doing saveAsTextFile and saveAsParquetFile

2015-07-14 Thread mathewvinoj
Hi There,

I am using cache mapPartition to do some processing and cache the result as
below

I am storing the file as both format (parquet and textfile) where 
recomputing is happening both time.Eventhough i put the  cache its not
working as expected.

below is the code snippet.Any help is really appreciated.

 val record = sql(sqlString)
   val outputRecords=record.repartition(1).mapPartitions{rows =
   val finalList1 = ListBuffer[Row]()  
   while (rows.hasNext){
.
.
finalList1.add(xyz)
  }
 finalList1.iterator   
 }.cache()

 val l = applySchema(outputRecords, schemaName).cache()
  l.saveAsTextFile(filename + .txt)
 l.saveAsParquetFile(filename+ .parquet)

Expected result: When we do saveAsTextFile the computation should happen and
cache the result
and the second time when we do saveAsparquetFile it should get the result
from the cache.

thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-cache-issue-while-doing-saveAsTextFile-and-saveAsParquetFile-tp23845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD saveAsTextFile() to local disk

2015-07-08 Thread Vijay Pawnarkar
Getting exception when wrting RDD to local disk using following function

 saveAsTextFile(file:home/someuser/dir2/testupload/20150708/)

The dir (/home/someuser/dir2/testupload/) was created before running the
job. The error message is misleading.


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
0.0 (TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException:
Parent path is not a directory: file:/home/someuser/dir2
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

-- 
-Vijay


Re: RDD saveAsTextFile() to local disk

2015-07-08 Thread canan chen
It works for me by using the following code. Could you share your code ?


*val data =sc.parallelize(List(1,2,3))*
*data.saveAsTextFile(file:Users/chen/Temp/c)*

On Thu, Jul 9, 2015 at 4:05 AM, spok20nn vijaypawnar...@gmail.com wrote:

 Getting exception when wrting RDD to local disk using following function

  saveAsTextFile(file:home/someuser/dir2/testupload/20150708/)

 The dir (/home/someuser/dir2/testupload/) was created before running the
 job. The error message is misleading.


 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in
 stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
 (TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException:
 Parent path is not a directory: file:/home/someuser/dir2
 at
 org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
 at

 org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
 at
 org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
 at

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060)
 at

 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-saveAsTextFile-to-local-disk-tp23725.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark dramatically slow when I add saveAsTextFile

2015-05-24 Thread allanjie
*Problem Description*:

The program running in  stand-alone spark cluster (1 master, 6 workers with
8g ram and 2 cores).
Input: a 468MB file with 133433 records stored in HDFS.
Output: just 2MB file will stored in HDFS
The program has two map operations and one reduceByKey operation.
Finally I save the result to HDFS using *saveAsTextFile*.
*Problem*: if I don't add saveAsTextFile, the program runs very fast(a few
seconds), otherwise extremely slow until about 30 mins.

*My program (is very Simple)*
public static void main(String[] args) throws IOException{
/**Parameter Setting***/
 String localPointPath = 
/home/hduser/skyrock/skyrockImageFeatures.csv;
 String remoteFilePath =
hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
 String outputPath = 
hdfs://HadoopV26Master:9000/user/sparkoutput/;
 final int row = 133433;
 final int col = 458;
 final double dc = Double.valueOf(args[0]);

SparkConf conf = new SparkConf().
setAppName(distance)
.set(spark.executor.memory, 
4g).set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.eventLog.enabled, true);
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDDString textFile = sc.textFile(remoteFilePath);

//Broadcast variable, the dimension of this double array: 
133433*458
final Broadcastdouble[][] broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
/**
 * Compute the distance in terms of each point on each instance.
 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
 */
JavaPairRDDInteger,Double distance = 
textFile.flatMapToPair(new
PairFlatMapFunctionString, Integer, Double(){
public IterableTuple2lt;Integer, Double call(String 
v1) throws
Exception{
ListString al = Arrays.asList(v1.split(,)); 
double[] featureVals = new double[al.size()];
for(int j=0;jal.size()-1;j++)
featureVals[j] = 
Double.valueOf(al.get(j+1));
int jIndex = Integer.valueOf(al.get(0));
double[][] allPoints =  broadcastPoints.value();
double sum = 0;
Listlt;Tuple2lt;Integer, Double list = new
ArrayListTuple2lt;Integer, Double();
for(int i=0;irow; i++){
sum = 0;
for(int j=0;jlt;al.size()-1;j++){
sum +=
(allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
}
list.add(new 
Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) ));
}
return list;
}
});

//Create zeroOne density
JavaPairRDDInteger, Integer densityZeroOne = 
distance.mapValues(new
FunctionDouble, Integer(){
public Integer call(Double v1) throws Exception {
if(v1dc)
return 1;
else return 0;
}

});
//  //Combine the density
JavaPairRDDlt;Integer, Integer counts = 
densityZeroOne.reduceByKey(new
Function2Integer, Integer,Integer() {
public Integer call(Integer v1, Integer v2) 
throws Exception {
return v1+v2;
}
});
counts.*saveAsTextFile*(outputPath+args[1]);
sc.stop();
}

*If I comment saveAsTextFile, log will be:*
Picked up _JAVA_OPTIONS: -Xmx4g
15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser
15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/24 15:21:31 INFO Remoting

Re: Spark dramatically slow when I add saveAsTextFile

2015-05-24 Thread Joe Wass
This may sound like an obvious question, but are you sure that the program
is doing any work when you don't have a saveAsTextFile? If there are
transformations but no actions to actually collect the data, there's no
need for Spark to execute the transformations.

As to the question of 'is this taking too long', I can't answer that. But
your code was HTML escaped and therefore difficult to read, perhaps you
should post a link to a Gist.

Joe

On 24 May 2015 at 10:36, allanjie allanmcgr...@gmail.com wrote:

 *Problem Description*:

 The program running in  stand-alone spark cluster (1 master, 6 workers with
 8g ram and 2 cores).
 Input: a 468MB file with 133433 records stored in HDFS.
 Output: just 2MB file will stored in HDFS
 The program has two map operations and one reduceByKey operation.
 Finally I save the result to HDFS using *saveAsTextFile*.
 *Problem*: if I don't add saveAsTextFile, the program runs very fast(a
 few
 seconds), otherwise extremely slow until about 30 mins.

 *My program (is very Simple)*
 public static void main(String[] args) throws IOException{
 /**Parameter Setting***/
  String localPointPath =
 /home/hduser/skyrock/skyrockImageFeatures.csv;
  String remoteFilePath =
 hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
  String outputPath =
 hdfs://HadoopV26Master:9000/user/sparkoutput/;
  final int row = 133433;
  final int col = 458;
  final double dc = Double.valueOf(args[0]);

 SparkConf conf = new SparkConf().
 setAppName(distance)
 .set(spark.executor.memory,
 4g).set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.eventLog.enabled, true);
 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaRDDString textFile = sc.textFile(remoteFilePath);

 //Broadcast variable, the dimension of this double array:
 133433*458
 final Broadcastdouble[][] broadcastPoints =
 sc.broadcast(createBroadcastPoints(localPointPath,row,col));
 /**
  * Compute the distance in terms of each point on each
 instance.
  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
  */
 JavaPairRDDInteger,Double distance =
 textFile.flatMapToPair(new
 PairFlatMapFunctionString, Integer, Double(){
 public IterableTuple2lt;Integer, Double
 call(String v1) throws
 Exception{
 ListString al =
 Arrays.asList(v1.split(,));
 double[] featureVals = new
 double[al.size()];
 for(int j=0;jal.size()-1;j++)
 featureVals[j] =
 Double.valueOf(al.get(j+1));
 int jIndex = Integer.valueOf(al.get(0));
 double[][] allPoints =
 broadcastPoints.value();
 double sum = 0;
 Listlt;Tuple2lt;Integer, Double list =
 new
 ArrayListTuple2lt;Integer, Double();
 for(int i=0;irow; i++){
 sum = 0;
 for(int j=0;jlt;al.size()-1;j++){
 sum +=
 (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
 }
 list.add(new
 Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) ));
 }
 return list;
 }
 });

 //Create zeroOne density
 JavaPairRDDInteger, Integer densityZeroOne =
 distance.mapValues(new
 FunctionDouble, Integer(){
 public Integer call(Double v1) throws Exception {
 if(v1dc)
 return 1;
 else return 0;
 }

 });
 //  //Combine the density
 JavaPairRDDlt;Integer, Integer counts =
 densityZeroOne.reduceByKey(new
 Function2Integer, Integer,Integer() {
 public Integer call(Integer v1, Integer
 v2) throws Exception {
 return v1+v2;
 }
 });
 counts.*saveAsTextFile*(outputPath+args[1]);
 sc.stop();
 }

 *If I comment saveAsTextFile, log will be:*
 Picked up _JAVA_OPTIONS: -Xmx4g
 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load

Re: FP Growth saveAsTextFile

2015-05-21 Thread Xiangrui Meng
+user

If this was in cluster mode, you should provide a path on a shared file
system, e.g., HDFS, instead of a local path. If this is in local model, I'm
not sure what went wrong.

On Wed, May 20, 2015 at 2:09 PM, Eric Tanner eric.tan...@justenough.com
wrote:

 Here is the stack trace. Thanks for looking at this.

 scala
 model.freqItemsets.saveAsTextFile(c:///repository/trunk/Scala_210_wspace/fpGrowth/modelText1)
 15/05/20 14:07:47 INFO SparkContext: Starting job: saveAsTextFile at
 console:33
 15/05/20 14:07:47 INFO DAGScheduler: Got job 15 (saveAsTextFile at
 console:33) with 2 output partitions (allowLocal=false)
 15/05/20 14:07:47 INFO DAGScheduler: Final stage: Stage 30(saveAsTextFile
 at console:33)
 15/05/20 14:07:47 INFO DAGScheduler: Parents of final stage: List(Stage 29)
 15/05/20 14:07:47 INFO DAGScheduler: Missing parents: List()
 15/05/20 14:07:47 INFO DAGScheduler: Submitting Stage 30
 (MapPartitionsRDD[21] at saveAsTextFile at console:33), which has no
 missing parents
 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(131288) called with
 curMem=724428, maxMem=278302556
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18 stored as values in
 memory (estimated size 128.2 KB, free 264.6 MB)
 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(78995) called with
 curMem=855716, maxMem=278302556
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18_piece0 stored as
 bytes in memory (estimated size 77.1 KB, free 264.5 MB)
 15/05/20 14:07:47 INFO BlockManagerInfo: Added broadcast_18_piece0 in
 memory on localhost:52396 (size: 77.1 KB, free: 265.1 MB)
 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
 broadcast_18_piece0
 15/05/20 14:07:47 INFO SparkContext: Created broadcast 18 from broadcast
 at DAGScheduler.scala:839
 15/05/20 14:07:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage
 30 (MapPartitionsRDD[21] at saveAsTextFile at console:33)
 15/05/20 14:07:47 INFO TaskSchedulerImpl: Adding task set 30.0 with 2 tasks
 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 17
 15/05/20 14:07:47 INFO TaskSetManager: Starting task 0.0 in stage 30.0
 (TID 33, localhost, PROCESS_LOCAL, 1056 bytes)
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17_piece0
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17_piece0 of size 4737
 dropped from memory (free 277372582)
 15/05/20 14:07:47 INFO TaskSetManager: Starting task 1.0 in stage 30.0
 (TID 34, localhost, PROCESS_LOCAL, 1056 bytes)
 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_17_piece0 on
 localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
 15/05/20 14:07:47 INFO Executor: Running task 1.0 in stage 30.0 (TID 34)
 15/05/20 14:07:47 INFO Executor: Running task 0.0 in stage 30.0 (TID 33)
 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
 broadcast_17_piece0
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17 of size 6696
 dropped from memory (free 277379278)
 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 17
 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 16
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16_piece0
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16_piece0 of size 4737
 dropped from memory (free 277384015)
 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_16_piece0 on
 localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
 broadcast_16_piece0
 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16
 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16 of size 6696
 dropped from memory (free 277390711)
 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 16
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
 blocks out of 2 blocks
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
 fetches in 1 ms
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
 blocks out of 2 blocks
 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
 fetches in 0 ms
 15/05/20 14:07:47 ERROR Executor: Exception in task 1.0 in stage 30.0 (TID
 34)
 java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:656)
 at
 org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:490)
 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462

saveAsTextFile() part- files are missing

2015-05-21 Thread rroxanaioana
Hello!
I just started with Spark. I have an application which counts words in a
file (1 MB file). 
The file is stored locally. I loaded the file using native code and then
created the RDD from it.
   
JavaRDDString rddFromFile = context.parallelize(myFile,
2);
JavaRDDString words = rddFromFile.flatMap(...);
JavaPairRDDString, Integer pairs = words.mapToPair(...);
JavaPairRDDString, Integer counter = pairs.reduceByKey(..);

counter.saveAsTextFile(file:///root/output);
context.close();

I have one master and 2 slaves. I run the program from the master node.
The output directory is created on the master node and on the 2 nodes. On
the master node I have only one file _SUCCES (empty) and on the nodes I have
_temporary file. I printed the counter at the console, the result seems ok.
What am I doing wrong?
Thank you!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile() part- files are missing

2015-05-21 Thread Tomasz Fruboes

Hi,

 it looks you are writing to a local filesystem. Could you try writing 
to a location visible by all nodes (master and workers), e.g. nfs share?


 HTH,
  Tomasz

W dniu 21.05.2015 o 17:16, rroxanaioana pisze:

Hello!
I just started with Spark. I have an application which counts words in a
file (1 MB file).
The file is stored locally. I loaded the file using native code and then
created the RDD from it.

 JavaRDDString rddFromFile = context.parallelize(myFile,
2);
JavaRDDString words = rddFromFile.flatMap(...);
JavaPairRDDString, Integer pairs = words.mapToPair(...);
JavaPairRDDString, Integer counter = pairs.reduceByKey(..);

counter.saveAsTextFile(file:///root/output);
context.close();

I have one master and 2 slaves. I run the program from the master node.
The output directory is created on the master node and on the 2 nodes. On
the master node I have only one file _SUCCES (empty) and on the nodes I have
_temporary file. I printed the counter at the console, the result seems ok.
What am I doing wrong?
Thank you!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



FP Growth saveAsTextFile

2015-05-20 Thread Eric Tanner
I am having trouble with saving an FP-Growth model as a text file.  I can
print out the results, but when I try to save the model I get a
NullPointerException.

model.freqItemsets.saveAsTextFile(c://fpGrowth/model)

Thanks,

Eric


Re: FP Growth saveAsTextFile

2015-05-20 Thread Xiangrui Meng
Could you post the stack trace? If you are using Spark 1.3 or 1.4, it
would be easier to save freq itemsets as a Parquet file. -Xiangrui

On Wed, May 20, 2015 at 12:16 PM, Eric Tanner
eric.tan...@justenough.com wrote:
 I am having trouble with saving an FP-Growth model as a text file.  I can
 print out the results, but when I try to save the model I get a
 NullPointerException.

 model.freqItemsets.saveAsTextFile(c://fpGrowth/model)

 Thanks,

 Eric

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-16 Thread Ilya Ganelin
All - this issue showed up when I was tearing down a spark context and
creating a new one. Often, I was unable to then write to HDFS due to this
error. I subsequently switched to a different implementation where instead
of tearing down and re initializing the spark context I'd instead submit a
separate request to YARN.
On Fri, May 15, 2015 at 2:35 PM Puneet Kapoor puneet.cse.i...@gmail.com
wrote:

 I am seeing this on hadoop 2.4.0 version.

 Thanks for your suggestions, i will try those and let you know if they
 help !

 On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com
 wrote:

  What version of Hadoop are you seeing this on?


  On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com
 wrote:

  Hey,

  Did you find any solution for this issue, we are seeing similar logs in
 our Data node logs. Appreciate any help.





  2015-05-15 10:51:43,615 ERROR
 org.apache.hadoop.hdfs.server.datanode.DataNode:
 NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
  src: /192.168.112.190:46253 dst: /192.168.151.104:50010
 java.net.SocketTimeoutException: 6 millis timeout while waiting for
 channel to be ready for read. ch :
 java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
 remote=/192.168.112.190:46253]
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
 at java.io.BufferedInputStream.fill(Unknown Source)
 at java.io.BufferedInputStream.read1(Unknown Source)
 at java.io.BufferedInputStream.read(Unknown Source)
 at java.io.DataInputStream.read(Unknown Source)
 at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
 at java.lang.Thread.run(Unknown Source)


  That's being logged @ error level in DN. It doesn't mean the DN has
 crashed, only that it timed out waiting for data: something has gone wrong
 elsewhere.

  https://issues.apache.org/jira/browse/HDFS-693


 there's a couple of properties you can do to extend timeouts

   property

 namedfs.socket.timeout/name

 value2/value

 /property


 property

 namedfs.datanode.socket.write.timeout/name

 value2/value

 /property



 You can also increase the number of data node tranceiver threads to
 handle data IO across the network


 property
 namedfs.datanode.max.xcievers/name
 value4096/value
 /property

 Yes, that property has that explicit spellinng, it's easy to get wrong





Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
Hey,

Did you find any solution for this issue, we are seeing similar logs in our
Data node logs. Appreciate any help.


2015-05-15 10:51:43,615 ERROR
org.apache.hadoop.hdfs.server.datanode.DataNode:
NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
 src: /192.168.112.190:46253 dst: /192.168.151.104:50010
java.net.SocketTimeoutException: 6 millis timeout while waiting for
channel to be ready for read. ch :
java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
remote=/192.168.112.190:46253]
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read1(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
at java.lang.Thread.run(Unknown Source)

Thanks
Puneet

On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya ilya.gane...@capitalone.com
wrote:

 Hi all, as the last stage of execution, I am writing out a dataset to disk. 
 Before I do this, I force the DAG to resolve so this is the only job left in 
 the pipeline. The dataset in question is not especially large (a few 
 gigabytes). During this step however, HDFS will inevitable crash. I will lose 
 connection to data-nodes and get stuck in the loop of death – failure causes 
 job restart, eventually causing the overall job to fail. On the data node 
 logs I see the errors below. Does anyone have any ideas as to what is going 
 on here? Thanks!


 java.io.IOException: Premature EOF from inputStream
   at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
   at 
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455)
   at 
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741)
   at 
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72)
   at 
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225)
   at java.lang.Thread.run(Thread.java:745)




 innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing 
 WRITE_BLOCK operation  src: /10.37.248.60:44676 dst: /10.37.248.59:1004
 java.net.SocketTimeoutException: 65000 millis timeout while waiting for 
 channel to be ready for read. ch : java.nio.channels.SocketChannel[connected 
 local=/10.37.248.59:43692 remote=/10.37.248.63:1004]
   at 
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
   at 
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
   at 
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
   at 
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
   at java.io.FilterInputStream.read(FilterInputStream.java:83)
   at java.io.FilterInputStream.read(FilterInputStream.java:83)
   at 
 org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101)
   at 
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
   at 
 

Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
I am seeing this on hadoop 2.4.0 version.

Thanks for your suggestions, i will try those and let you know if they help
!

On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com
wrote:

  What version of Hadoop are you seeing this on?


  On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com
 wrote:

  Hey,

  Did you find any solution for this issue, we are seeing similar logs in
 our Data node logs. Appreciate any help.





  2015-05-15 10:51:43,615 ERROR
 org.apache.hadoop.hdfs.server.datanode.DataNode:
 NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
  src: /192.168.112.190:46253 dst: /192.168.151.104:50010
 java.net.SocketTimeoutException: 6 millis timeout while waiting for
 channel to be ready for read. ch :
 java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
 remote=/192.168.112.190:46253]
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
 at java.io.BufferedInputStream.fill(Unknown Source)
 at java.io.BufferedInputStream.read1(Unknown Source)
 at java.io.BufferedInputStream.read(Unknown Source)
 at java.io.DataInputStream.read(Unknown Source)
 at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
 at java.lang.Thread.run(Unknown Source)


  That's being logged @ error level in DN. It doesn't mean the DN has
 crashed, only that it timed out waiting for data: something has gone wrong
 elsewhere.

  https://issues.apache.org/jira/browse/HDFS-693


 there's a couple of properties you can do to extend timeouts

   property

 namedfs.socket.timeout/name

 value2/value

 /property


 property

 namedfs.datanode.socket.write.timeout/name

 value2/value

 /property



 You can also increase the number of data node tranceiver threads to handle
 data IO across the network


 property
 namedfs.datanode.max.xcievers/name
 value4096/value
 /property

 Yes, that property has that explicit spellinng, it's easy to get wrong




Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread Sudarshan Murty
Another thing - could it be a permission problem ?
It creates all the directory structure (in red)/tmp/wordcount/
_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
so I am guessing not.

On Tue, May 5, 2015 at 7:27 PM, Sudarshan Murty njmu...@gmail.com wrote:

 You are most probably right. I assumed others may have run into this.
 When I try to put the files in there, it creates a directory structure
 with the part-0 and part1 files but these files are of size 0 - no
 content. The client error and the server logs have  the error message shown
 - which seem to indicate that the system is aware that a datanode exists
 but is excluded from the operation. So, it looks like it is not partitioned
 and Ambari indicates that HDFS is in good health with one NN, one SN, one
 DN.
 I am unable to figure out what the issue is.
 thanks for your help.

 On Tue, May 5, 2015 at 6:39 PM, ayan guha guha.a...@gmail.com wrote:

 What happens when you try to put files to your hdfs from local
 filesystem? Looks like its a hdfs issue rather than spark thing.
 On 6 May 2015 05:04, Sudarshan njmu...@gmail.com wrote:

 I have searched all replies to this question  not found an answer.

 I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
 side, on the same machine and trying to write output of wordcount program 
 into HDFS (works fine writing to a local file, /tmp/wordcount).

 Only line I added to the wordcount program is: (where 'counts' is the 
 JavaPairRDD)
 *counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
 http://sandbox.hortonworks.com:8020/tmp/wordcount);*

 When I check in HDFS at that location (/tmp) here's what I find.
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
 and
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1

 and *both part-000[01] are 0 size files*.

 The wordcount client output error is:
 [Stage 1:  (0 + 2) 
 / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
 org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
  *could only be replicated to 0 nodes instead of minReplication (=1).  
 There are 1 datanode(s) running and 1 node(s) are excluded in this 
 operation.*
 at 
 org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
 at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
 at 
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)


 I tried this with Spark 1.2.1 same error.
 I have plenty of space on the DFS.
 The Name Node, Sec Name Node  the one Data Node are all healthy.

 Any hint as to what may be the problem ?
 thanks in advance.
 Sudarshan


 --
 View this message in context: saveAsTextFile() to save output of Spark
 program to HDFS
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread Sudarshan Murty
You are most probably right. I assumed others may have run into this.
When I try to put the files in there, it creates a directory structure with
the part-0 and part1 files but these files are of size 0 - no
content. The client error and the server logs have  the error message shown
- which seem to indicate that the system is aware that a datanode exists
but is excluded from the operation. So, it looks like it is not partitioned
and Ambari indicates that HDFS is in good health with one NN, one SN, one
DN.
I am unable to figure out what the issue is.
thanks for your help.

On Tue, May 5, 2015 at 6:39 PM, ayan guha guha.a...@gmail.com wrote:

 What happens when you try to put files to your hdfs from local filesystem?
 Looks like its a hdfs issue rather than spark thing.
 On 6 May 2015 05:04, Sudarshan njmu...@gmail.com wrote:

 I have searched all replies to this question  not found an answer.

 I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
 side, on the same machine and trying to write output of wordcount program 
 into HDFS (works fine writing to a local file, /tmp/wordcount).

 Only line I added to the wordcount program is: (where 'counts' is the 
 JavaPairRDD)
 *counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
 http://sandbox.hortonworks.com:8020/tmp/wordcount);*

 When I check in HDFS at that location (/tmp) here's what I find.
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
 and
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1

 and *both part-000[01] are 0 size files*.

 The wordcount client output error is:
 [Stage 1:  (0 + 2) 
 / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
 org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
  *could only be replicated to 0 nodes instead of minReplication (=1).  There 
 are 1 datanode(s) running and 1 node(s) are excluded in this operation.*
  at 
 org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
  at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
  at 
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)


 I tried this with Spark 1.2.1 same error.
 I have plenty of space on the DFS.
 The Name Node, Sec Name Node  the one Data Node are all healthy.

 Any hint as to what may be the problem ?
 thanks in advance.
 Sudarshan


 --
 View this message in context: saveAsTextFile() to save output of Spark
 program to HDFS
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.




Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread ayan guha
What happens when you try to put files to your hdfs from local filesystem?
Looks like its a hdfs issue rather than spark thing.
On 6 May 2015 05:04, Sudarshan njmu...@gmail.com wrote:


 I have searched all replies to this question  not found an answer.

 I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
 side, on the same machine and trying to write output of wordcount program 
 into HDFS (works fine writing to a local file, /tmp/wordcount).

 Only line I added to the wordcount program is: (where 'counts' is the 
 JavaPairRDD)
 *counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
 http://sandbox.hortonworks.com:8020/tmp/wordcount);*

 When I check in HDFS at that location (/tmp) here's what I find.
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
 and
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1

 and *both part-000[01] are 0 size files*.

 The wordcount client output error is:
 [Stage 1:  (0 + 2) / 
 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
 org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
 /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
  *could only be replicated to 0 nodes instead of minReplication (=1).  There 
 are 1 datanode(s) running and 1 node(s) are excluded in this operation.*
   at 
 org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
   at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
   at 
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)


 I tried this with Spark 1.2.1 same error.
 I have plenty of space on the DFS.
 The Name Node, Sec Name Node  the one Data Node are all healthy.

 Any hint as to what may be the problem ?
 thanks in advance.
 Sudarshan


 --
 View this message in context: saveAsTextFile() to save output of Spark
 program to HDFS
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread Sudarshan
I have searched all replies to this question  not found an answer.I am
running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by side, on
the same machine and trying to write output of wordcount program into HDFS
(works fine writing to a local file, /tmp/wordcount).Only line I added to
the wordcount program is: (where 'counts' is the
JavaPairRDD)*counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount);*When
I check in HDFS at that location (/tmp) here's what I
find./tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0and/tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1and
*both part-000[01] are 0 size files*.The wordcount client output error
is:[Stage 1:  (0 +
2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer
Exceptionorg.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
*could only be replicated to 0 nodes instead of minReplication (=1).  There
are 1 datanode(s) running and 1 node(s) are excluded in this operation.*
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)I
tried this with Spark 1.2.1 same error.I have plenty of space on the DFS.The
Name Node, Sec Name Node  the one Data Node are all healthy.Any hint as to
what may be the problem ?thanks in advance.Sudarshan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Efficient saveAsTextFile by key, directory for each key?

2015-04-22 Thread Arun Luthra
I ended up post-processing the result in hive with a dynamic partition
insert query to get a table partitioned by the key.

Looking further, it seems that 'dynamic partition' insert is in Spark SQL
and working well in Spark SQL versions  1.2.0:
https://issues.apache.org/jira/browse/SPARK-3007

On Tue, Apr 21, 2015 at 5:45 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Is there an efficient way to save an RDD with saveAsTextFile in such a way
 that the data gets shuffled into separated directories according to a key?
 (My end goal is to wrap the result in a multi-partitioned Hive table)

 Suppose you have:

 case class MyData(val0: Int, val1: string, directory_name: String)

 and an RDD called myrdd with type RDD[MyData]. Suppose that you already
 have an array of the distinct directory_name's, called distinct_directories.

 A very inefficient way to to this is:

 distinct_directories.foreach(
   dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name )
 .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,))
 .coalesce(5)
 .saveAsTextFile(base_dir_name/ + f$dir_name)
 )

 I tried this solution, and it does not do the multiple myrdd.filter's in
 parallel.

 I'm guessing partitionBy might be in the efficient solution if an easy
 efficient solution exists...

 Thanks,
 Arun



Efficient saveAsTextFile by key, directory for each key?

2015-04-21 Thread Arun Luthra
Is there an efficient way to save an RDD with saveAsTextFile in such a way
that the data gets shuffled into separated directories according to a key?
(My end goal is to wrap the result in a multi-partitioned Hive table)

Suppose you have:

case class MyData(val0: Int, val1: string, directory_name: String)

and an RDD called myrdd with type RDD[MyData]. Suppose that you already
have an array of the distinct directory_name's, called distinct_directories.

A very inefficient way to to this is:

distinct_directories.foreach(
  dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name )
.map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,))
.coalesce(5)
.saveAsTextFile(base_dir_name/ + f$dir_name)
)

I tried this solution, and it does not do the multiple myrdd.filter's in
parallel.

I'm guessing partitionBy might be in the efficient solution if an easy
efficient solution exists...

Thanks,
Arun


Re: Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2

2015-04-17 Thread Akhil Das
Not sure if this will help, but try clearing your jar cache (for sbt
~/.ivy2 and for maven ~/.m2) directories.

Thanks
Best Regards

On Wed, Apr 15, 2015 at 9:33 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Env - Spark 1.3 Hadoop 2.3, Kerbeos

  xx.saveAsTextFile(path, codec) gives following trace. Same works with
 Spark 1.2 in same environment

 val codec = classOf[some codec class]

 val a = sc.textFile(/some_hdfs_file)

 a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following
 trace in Spark 1.3, works in Spark 1.2 in same env

 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage
 2.0 (TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot
 authenticate the provider BC) [duplicate 7]
 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose
 tasks have all completed, from pool
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate
 the provider BC
 at javax.crypto.Cipher.getInstance(Cipher.java:642)
 at javax.crypto.Cipher.getInstance(Cipher.java:580)
  some codec calls 
 at
 org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
 at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.util.jar.JarException:
 file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned
 entries - org/apache/spark/SparkHadoopWriter$.class
 at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462)
 at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322)
 at javax.crypto.JarVerifier.verify(JarVerifier.java:250)
 at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161)
 at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187)
 at javax.crypto.Cipher.getInstance(Cipher.java:638)
 ... 16 more

 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 http://org.apache.spark.scheduler.dagscheduler.org/
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
I am using Spark Streaming where during each micro-batch I output data to
S3 using
saveAsTextFile. Right now each batch of data is put into its own directory
containing
2 objects, _SUCCESS and part-0.

How do I output each batch into a common directory?

Thanks,
Vadim
ᐧ


Re: saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
Thanks Sean. I want to load each batch into Redshift. What's the best/most 
efficient way to do that?

Vadim


 On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote:
 
 You can't, since that's how it's designed to work. Batches are saved
 in different files, which are really directories containing
 partitions, as is common in Hadoop. You can move them later, or just
 read them where they are.
 
 On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
 vadim.bichuts...@gmail.com wrote:
 I am using Spark Streaming where during each micro-batch I output data to S3
 using
 saveAsTextFile. Right now each batch of data is put into its own directory
 containing
 2 objects, _SUCCESS and part-0.
 
 How do I output each batch into a common directory?
 
 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
The reason for this is as follows:

 

1.   You are saving data on HDFS

2.   HDFS as a cluster/server side Service has a Single Writer / Multiple 
Reader multithreading model 

3.   Hence each thread of execution in Spark has to write to a separate 
file in HDFS

4.   Moreover the RDDs are partitioned across cluster nodes and operated 
upon by multiple threads there and on top of that in Spark Streaming you have 
many micro-batch RDDs streaming in all the time as part of a DStream  

 

If you want fine / detailed management of the writing to HDFS you can implement 
your own HDFS adapter and invoke it in forEachRDD and foreach 

 

Regards

Evo Eftimov  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:33 PM
To: user@spark.apache.org
Subject: saveAsTextFile

 

I am using Spark Streaming where during each micro-batch I output data to S3 
using

saveAsTextFile. Right now each batch of data is put into its own directory 
containing

2 objects, _SUCCESS and part-0.

 

How do I output each batch into a common directory?

 

Thanks,

Vadim

  
https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6
 ᐧ

  
http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
 



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Nop Sir, it is possible - check my reply earlier 

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, April 16, 2015 6:35 PM
To: Vadim Bichutskiy
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

You can't, since that's how it's designed to work. Batches are saved in 
different files, which are really directories containing partitions, as is 
common in Hadoop. You can move them later, or just read them where they are.

On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com 
wrote:
 I am using Spark Streaming where during each micro-batch I output data 
 to S3 using saveAsTextFile. Right now each batch of data is put into 
 its own directory containing
 2 objects, _SUCCESS and part-0.

 How do I output each batch into a common directory?

 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Basically you need to unbundle the elements of the RDD and then store them 
wherever you want - Use foreacPartition and then foreach 

-Original Message-
From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:39 PM
To: Sean Owen
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

Thanks Sean. I want to load each batch into Redshift. What's the best/most 
efficient way to do that?

Vadim


 On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote:
 
 You can't, since that's how it's designed to work. Batches are saved 
 in different files, which are really directories containing 
 partitions, as is common in Hadoop. You can move them later, or just 
 read them where they are.
 
 On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:
 I am using Spark Streaming where during each micro-batch I output 
 data to S3 using saveAsTextFile. Right now each batch of data is put 
 into its own directory containing
 2 objects, _SUCCESS and part-0.
 
 How do I output each batch into a common directory?
 
 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile

2015-04-16 Thread Sean Owen
Just copy the files? it shouldn't matter that much where they are as
you can find them easily. Or consider somehow sending the batches of
data straight into Redshift? no idea how that is done but I imagine
it's doable.

On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy
vadim.bichuts...@gmail.com wrote:
 Thanks Sean. I want to load each batch into Redshift. What's the best/most 
 efficient way to do that?

 Vadim


 On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote:

 You can't, since that's how it's designed to work. Batches are saved
 in different files, which are really directories containing
 partitions, as is common in Hadoop. You can move them later, or just
 read them where they are.

 On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
 vadim.bichuts...@gmail.com wrote:
 I am using Spark Streaming where during each micro-batch I output data to S3
 using
 saveAsTextFile. Right now each batch of data is put into its own directory
 containing
 2 objects, _SUCCESS and part-0.

 How do I output each batch into a common directory?

 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Also to juggle even further the multithreading model of both spark and HDFS you 
can even publish the data from spark first to a message broker e.g. kafka from 
where a predetermined number (from 1 to infinity) of parallel consumers will 
retrieve and store in HDFS in one or more finely controlled files and 
directories  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:45 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

 

Thanks Evo for your detailed explanation.


On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote:

The reason for this is as follows:

 

1.  You are saving data on HDFS

2.  HDFS as a cluster/server side Service has a Single Writer / Multiple 
Reader multithreading model 

3.  Hence each thread of execution in Spark has to write to a separate file 
in HDFS

4.  Moreover the RDDs are partitioned across cluster nodes and operated 
upon by multiple threads there and on top of that in Spark Streaming you have 
many micro-batch RDDs streaming in all the time as part of a DStream  

 

If you want fine / detailed management of the writing to HDFS you can implement 
your own HDFS adapter and invoke it in forEachRDD and foreach 

 

Regards

Evo Eftimov  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:33 PM
To: user@spark.apache.org
Subject: saveAsTextFile

 

I am using Spark Streaming where during each micro-batch I output data to S3 
using

saveAsTextFile. Right now each batch of data is put into its own directory 
containing

2 objects, _SUCCESS and part-0.

 

How do I output each batch into a common directory?

 

Thanks,

Vadim

  
https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6
 ᐧ

  
http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
 



Re: saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
Copy should be doable but I'm not sure how to specify a prefix for the 
directory while keeping the filename (ie part-0) fixed in copy command.



 On Apr 16, 2015, at 1:51 PM, Sean Owen so...@cloudera.com wrote:
 
 Just copy the files? it shouldn't matter that much where they are as
 you can find them easily. Or consider somehow sending the batches of
 data straight into Redshift? no idea how that is done but I imagine
 it's doable.
 
 On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy
 vadim.bichuts...@gmail.com wrote:
 Thanks Sean. I want to load each batch into Redshift. What's the best/most 
 efficient way to do that?
 
 Vadim
 
 
 On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote:
 
 You can't, since that's how it's designed to work. Batches are saved
 in different files, which are really directories containing
 partitions, as is common in Hadoop. You can move them later, or just
 read them where they are.
 
 On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
 vadim.bichuts...@gmail.com wrote:
 I am using Spark Streaming where during each micro-batch I output data to 
 S3
 using
 saveAsTextFile. Right now each batch of data is put into its own directory
 containing
 2 objects, _SUCCESS and part-0.
 
 How do I output each batch into a common directory?
 
 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile

2015-04-16 Thread Sean Owen
You can't, since that's how it's designed to work. Batches are saved
in different files, which are really directories containing
partitions, as is common in Hadoop. You can move them later, or just
read them where they are.

On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
vadim.bichuts...@gmail.com wrote:
 I am using Spark Streaming where during each micro-batch I output data to S3
 using
 saveAsTextFile. Right now each batch of data is put into its own directory
 containing
 2 objects, _SUCCESS and part-0.

 How do I output each batch into a common directory?

 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
Thanks Evo for your detailed explanation.

 On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 
 The reason for this is as follows:
  
 1.   You are saving data on HDFS
 2.   HDFS as a cluster/server side Service has a Single Writer / Multiple 
 Reader multithreading model
 3.   Hence each thread of execution in Spark has to write to a separate 
 file in HDFS
 4.   Moreover the RDDs are partitioned across cluster nodes and operated 
 upon by multiple threads there and on top of that in Spark Streaming you have 
 many micro-batch RDDs streaming in all the time as part of a DStream  
  
 If you want fine / detailed management of the writing to HDFS you can 
 implement your own HDFS adapter and invoke it in forEachRDD and foreach
  
 Regards
 Evo Eftimov  
  
 From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
 Sent: Thursday, April 16, 2015 6:33 PM
 To: user@spark.apache.org
 Subject: saveAsTextFile
  
 I am using Spark Streaming where during each micro-batch I output data to S3 
 using
 saveAsTextFile. Right now each batch of data is put into its own directory 
 containing
 2 objects, _SUCCESS and part-0.
  
 How do I output each batch into a common directory?
  
 Thanks,
 Vadim
 ᐧ


Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2

2015-04-15 Thread Manoj Samel
Env - Spark 1.3 Hadoop 2.3, Kerbeos

 xx.saveAsTextFile(path, codec) gives following trace. Same works with
Spark 1.2 in same environment

val codec = classOf[some codec class]

val a = sc.textFile(/some_hdfs_file)

a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace
in Spark 1.3, works in Spark 1.2 in same env

15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0
(TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot
authenticate the provider BC) [duplicate 7]
15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose
tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate
the provider BC
at javax.crypto.Cipher.getInstance(Cipher.java:642)
at javax.crypto.Cipher.getInstance(Cipher.java:580)
 some codec calls 
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.jar.JarException:
file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned
entries - org/apache/spark/SparkHadoopWriter$.class
at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462)
at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322)
at javax.crypto.JarVerifier.verify(JarVerifier.java:250)
at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161)
at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187)
at javax.crypto.Cipher.getInstance(Cipher.java:638)
... 16 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
http://org.apache.spark.scheduler.dagscheduler.org/
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Re: Spark permission denied error when invoking saveAsTextFile

2015-04-01 Thread Kannan Rajah
Ignore the question. There was a Hadoop setting that needed to be set to
get it working.


--
Kannan

On Wed, Apr 1, 2015 at 1:37 PM, Kannan Rajah kra...@maprtech.com wrote:

 Running a simple word count job in standalone mode as a non root user from
 spark-shell. The spark master, worker services are running as root user.

 The problem is the _temporary under /user/krajah/output2/_temporary/0 dir
 is being created with root permission even when running the job as non root
 user - krajah in this case. The higher level directories are getting
 created with right permission though. There was a similar question posted
 long time back, but there is no answer:
 http://mail-archives.apache.org/mod_mbox/mesos-user/201408.mbox/%3CCAAeYHL2M9J9xEotf_0zXmZXy2_x-oBHa=xxl2naft203o6u...@mail.gmail.com%3E


 *Wrong permission for child directory*
 drwxr-xr-x   - root   root0 2015-04-01 11:20
 /user/krajah/output2/_temporary/0/_temporary


 *Right permission for parent directories*
 hadoop fs -ls -R /user/krajah/my_output
 drwxr-xr-x   - krajah krajah  1 2015-04-01 11:46
 /user/krajah/my_output/_temporary
 drwxr-xr-x   - krajah krajah  3 2015-04-01 11:46
 /user/krajah/my_output/_temporary/0

 *Job and Stacktrace*

 scala val file = sc.textFile(/user/krajah/junk.txt)
 scala val counts = file.flatMap(line = line.split( ))
 scala .map(word = (word, 1))
 scala .reduceByKey(_ + _)

 scala counts.saveAsTextFile(/user/krajah/count2)
 java.io.IOException: Error: Permission denied
 at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:926)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1079)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:853)
 at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1199)
 at $iwC$$iwC$$iwC$$iwC.init(console:17)
 at $iwC$$iwC$$iwC.init(console:22)
 at $iwC$$iwC.init(console:24)
 at $iwC.init(console:26)
 at init(console:28)
 at .init(console:32)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


 --
 Kannan



Spark permission denied error when invoking saveAsTextFile

2015-04-01 Thread Kannan Rajah
Running a simple word count job in standalone mode as a non root user from
spark-shell. The spark master, worker services are running as root user.

The problem is the _temporary under /user/krajah/output2/_temporary/0 dir
is being created with root permission even when running the job as non root
user - krajah in this case. The higher level directories are getting
created with right permission though. There was a similar question posted
long time back, but there is no answer:
http://mail-archives.apache.org/mod_mbox/mesos-user/201408.mbox/%3CCAAeYHL2M9J9xEotf_0zXmZXy2_x-oBHa=xxl2naft203o6u...@mail.gmail.com%3E


*Wrong permission for child directory*
drwxr-xr-x   - root   root0 2015-04-01 11:20
/user/krajah/output2/_temporary/0/_temporary


*Right permission for parent directories*
hadoop fs -ls -R /user/krajah/my_output
drwxr-xr-x   - krajah krajah  1 2015-04-01 11:46
/user/krajah/my_output/_temporary
drwxr-xr-x   - krajah krajah  3 2015-04-01 11:46
/user/krajah/my_output/_temporary/0

*Job and Stacktrace*

scala val file = sc.textFile(/user/krajah/junk.txt)
scala val counts = file.flatMap(line = line.split( ))
scala .map(word = (word, 1))
scala .reduceByKey(_ + _)

scala counts.saveAsTextFile(/user/krajah/count2)
java.io.IOException: Error: Permission denied
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:926)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at
org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at
org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1079)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:853)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1199)
at $iwC$$iwC$$iwC$$iwC.init(console:17)
at $iwC$$iwC$$iwC.init(console:22)
at $iwC$$iwC.init(console:24)
at $iwC.init(console:26)
at init(console:28)
at .init(console:32)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


--
Kannan


Pyspark saveAsTextFile exceptions

2015-03-13 Thread Madabhattula Rajesh Kumar
Hi Team,

I'm getting below exception for saving the results into hadoop.


*Code :*
rdd.saveAsTextFile(hdfs://localhost:9000/home/rajesh/data/result.rdd)

Could you please help me how to resolve this issue.

15/03/13 17:19:31 INFO spark.SparkContext: Starting job: saveAsTextFile at
NativeMethodAccessorImpl.java:-2
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Got job 6 (saveAsTextFile at
NativeMethodAccessorImpl.java:-2) with 4 output partitions
(allowLocal=false)
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Final stage: Stage
10(saveAsTextFile at NativeMethodAccessorImpl.java:-2)
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Parents of final stage:
List()
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Missing parents: List()
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Submitting Stage 10
(MappedRDD[31] at saveAsTextFile at NativeMethodAccessorImpl.java:-2),
which has no missing parents
15/03/13 17:19:31 INFO storage.MemoryStore: ensureFreeSpace(98240) called
with curMem=203866, maxMem=280248975
15/03/13 17:19:31 INFO storage.MemoryStore: Block broadcast_9 stored as
values in memory (estimated size 95.9 KB, free 267.0 MB)
15/03/13 17:19:31 INFO storage.MemoryStore: ensureFreeSpace(59150) called
with curMem=302106, maxMem=280248975
15/03/13 17:19:31 INFO storage.MemoryStore: Block broadcast_9_piece0 stored
as bytes in memory (estimated size 57.8 KB, free 266.9 MB)
15/03/13 17:19:31 INFO storage.BlockManagerInfo: Added broadcast_9_piece0
in memory on localhost:57655 (size: 57.8 KB, free: 267.2 MB)
15/03/13 17:19:31 INFO storage.BlockManagerMaster: Updated info of block
broadcast_9_piece0
15/03/13 17:19:31 INFO spark.SparkContext: Created broadcast 9 from
broadcast at DAGScheduler.scala:838
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 10 (MappedRDD[31] at saveAsTextFile at
NativeMethodAccessorImpl.java:-2)
15/03/13 17:19:31 INFO scheduler.TaskSchedulerImpl: Adding task set 10.0
with 4 tasks
15/03/13 17:19:31 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
10.0 (TID 8, localhost, PROCESS_LOCAL, 1375 bytes)
15/03/13 17:19:31 INFO executor.Executor: Running task 0.0 in stage 10.0
(TID 8)
15/03/13 17:19:31 INFO executor.Executor: Fetching
http://10.0.2.15:54815/files/sftordd_pickle with timestamp 1426247370763
15/03/13 17:19:31 INFO util.Utils: Fetching
http://10.0.2.15:54815/files/sftordd_pickle to
/tmp/fetchFileTemp7846328782039551224.tmp
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.dir is
deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.key.class
is deprecated. Instead, use mapreduce.job.output.key.class
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.value.class
is deprecated. Instead, use mapreduce.job.output.value.class
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.working.dir is
deprecated. Instead, use mapreduce.job.working.dir
terminate called after throwing an instance of 'std::invalid_argument'
  what():  stoi
15/03/13 17:19:31 ERROR python.PythonRDD: Python worker exited unexpectedly
(crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File /home/rajesh/spark-1.2.0/python/pyspark/worker.py, line 90, in main
command = pickleSer._read_with_length(infile)
  File /home/rajesh/spark-1.2.0/python/pyspark/serializers.py, line 145,
in _read_with_length
length = read_int(stream)
  File /home/rajesh/spark-1.2.0/python/pyspark/serializers.py, line 511,
in read_int
raise EOFError
EOFError

at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Subprocess exited with status 134
at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:161)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327

Re: saveAsTextFile extremely slow near finish

2015-03-11 Thread Imran Rashid
is your data skewed?  Could it be that there are a few keys with a huge
number of records?  You might consider outputting
(recordA, count)
(recordB, count)

instead of

recordA
recordA
recordA
...


you could do this with:

input = sc.textFile
pairsCounts = input.map{x = (x,1)}.reduceByKey{_ + _}
sorted = pairs.sortByKey
sorted.saveAsTextFile


On Mon, Mar 9, 2015 at 12:31 PM, mingweili0x m...@spokeo.com wrote:

 I'm basically running a sorting using spark. The spark program will read
 from
 HDFS, sort on composite keys, and then save the partitioned result back to
 HDFS.
 pseudo code is like this:

 input = sc.textFile
 pairs = input.mapToPair
 sorted = pairs.sortByKey
 values = sorted.values
 values.saveAsTextFile

  Input size is ~ 160G, and I made 1000 partitions specified in
 JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is
 splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished
 in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress
 and the last few jobs just took forever and never finishes.

 Cluster setup:
 8 nodes
 on each node: 15gb memory, 8 cores

 running parameters:
 --executor-memory 12G
 --conf spark.cores.max=60

 Thank you for any help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: saveAsTextFile extremely slow near finish

2015-03-10 Thread Akhil Das
Don't you think 1000 is too less for 160GB of data? Also you could try
using KryoSerializer, Enabling RDD Compression.

Thanks
Best Regards

On Mon, Mar 9, 2015 at 11:01 PM, mingweili0x m...@spokeo.com wrote:

 I'm basically running a sorting using spark. The spark program will read
 from
 HDFS, sort on composite keys, and then save the partitioned result back to
 HDFS.
 pseudo code is like this:

 input = sc.textFile
 pairs = input.mapToPair
 sorted = pairs.sortByKey
 values = sorted.values
 values.saveAsTextFile

  Input size is ~ 160G, and I made 1000 partitions specified in
 JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is
 splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished
 in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress
 and the last few jobs just took forever and never finishes.

 Cluster setup:
 8 nodes
 on each node: 15gb memory, 8 cores

 running parameters:
 --executor-memory 12G
 --conf spark.cores.max=60

 Thank you for any help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: saveAsTextFile extremely slow near finish

2015-03-10 Thread Sean Owen
This is more of an aside, but why repartition this data instead of letting
it define partitions naturally? You will end up with a similar number.
On Mar 9, 2015 5:32 PM, mingweili0x m...@spokeo.com wrote:

 I'm basically running a sorting using spark. The spark program will read
 from
 HDFS, sort on composite keys, and then save the partitioned result back to
 HDFS.
 pseudo code is like this:

 input = sc.textFile
 pairs = input.mapToPair
 sorted = pairs.sortByKey
 values = sorted.values
 values.saveAsTextFile

  Input size is ~ 160G, and I made 1000 partitions specified in
 JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is
 splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished
 in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress
 and the last few jobs just took forever and never finishes.

 Cluster setup:
 8 nodes
 on each node: 15gb memory, 8 cores

 running parameters:
 --executor-memory 12G
 --conf spark.cores.max=60

 Thank you for any help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




saveAsTextFile extremely slow near finish

2015-03-09 Thread mingweili0x
I'm basically running a sorting using spark. The spark program will read from
HDFS, sort on composite keys, and then save the partitioned result back to
HDFS.
pseudo code is like this:

input = sc.textFile
pairs = input.mapToPair
sorted = pairs.sortByKey
values = sorted.values
values.saveAsTextFile

 Input size is ~ 160G, and I made 1000 partitions specified in
JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is
splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished
in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress
and the last few jobs just took forever and never finishes. 

Cluster setup:
8 nodes
on each node: 15gb memory, 8 cores

running parameters:
--executor-memory 12G
--conf spark.cores.max=60

Thank you for any help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile of RDD[Array[Any]]

2015-02-09 Thread Jong Wook Kim
If you have `RDD[Array[Any]]` you can do

rdd.map(_.mkString(\t))

or with some other delimiter to make it `RDD[String]`, and then call
`saveAsTextFile`.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-of-RDD-Array-Any-tp21548p21554.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: performance of saveAsTextFile moving files from _temporary

2015-01-28 Thread Aaron Davidson
Upon completion of the 2 hour part of the run, the files did not exist in
the output directory? One thing that is done serially is deleting any
remaining files from _temporary, so perhaps there was a lot of data
remaining in _temporary but the committed data had already been moved.

I am, unfortunately, not aware of other issues that would cause this to be
so slow.

On Tue, Jan 27, 2015 at 6:54 PM, Josh Walton j...@openbookben.com wrote:

 I'm not sure how to confirm how the moving is happening, however, one of
 the jobs just completed that I was talking about with 9k files of 4mb each.
 Spark UI showed the job being complete after ~2 hours. The last four hours
 of the job was just moving the files from _temporary to their final
 destination. The tasks for the write were definitely shown as complete, no
 logging is happening on the master or workers. The last line of my java
 code logs, but the job sits there as the moving of files happens.

 On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 This renaming from _temporary to the final location is actually done by
 executors, in parallel, for saveAsTextFile. It should be performed by each
 task individually before it returns.

 I have seen an issue similar to what you mention dealing with Hive code
 which did the renaming serially on the driver, which is very slow for S3
 (and possibly Google Storage as well), as it actually copies the data
 rather than doing a metadata-only operation during rename. However, this
 should not be an issue in this case.

 Could you confirm how the moving is happening -- i.e., on the executors
 or the driver?

 On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote:

 We are running spark in Google Compute Engine using their One-Click
 Deploy.
 By doing so, we get their Google Cloud Storage connector for hadoop for
 free
 meaning we can specify gs:// paths for input and output.

 We have jobs that take a couple of hours, end up with ~9k partitions
 which
 means 9k output files. After the job is complete it then moves the
 output
 files from our $output_path/_temporary to $output_path. That process can
 take longer than the job itself depending on the circumstances. The job I
 mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
 files in 1.5 hours from _temporary to the final destination.

 Is there a solution to this besides reducing the number of partitions?
 Anyone else run into similar issues elsewhere? I don't remember this
 being
 an issue with Map Reduce jobs and hadoop, however, I probably wasn't
 tracking the transfer of the output files like I am with Spark.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: performance of saveAsTextFile moving files from _temporary

2015-01-28 Thread Thomas Demoor
TLDR Extend FileOutPutCommitter to eliminate the temporary_storage. There
are some implementations to be found online, typically called
DirectOutputCommitter, f.i. this spark pull request
https://github.com/themodernlife/spark/commit/4359664b1d557d55b0579023df809542386d5b8c.
Tell Spark to use your shiny new committer when writing to an object store
and all will be well.

Aaron is on the right track but this renaming is bottlenecked by the object
storage system itself, irrespective of being executed in the driver or the
executor. Object stores (s3, google, azure, amplidata :P...) do not have a
native rename (it is implemented as a server side copy operation, thus its
duration is proportional to the object size). By default, Hadoop (and thus
also Spark) uses rename from temporary to final output path to enable both
retrying and speculative execution  because HDFS is a single writer system,
so multiple job attempts cannot write to the final output concurrently.
Object stores do allow multiple concurrent-writes to the same output, which
is exactly what makes a native rename nigh impossible. The solution is to
enable this concurrent writing instead of renaming to final output by using
a custom OutputCommitter which does not use a temporary location.

Thomas Demoor
skype: demoor.thomas
mobile: +32 497883833

On Wed, Jan 28, 2015 at 3:54 AM, Josh Walton j...@openbookben.com wrote:

 I'm not sure how to confirm how the moving is happening, however, one of
 the jobs just completed that I was talking about with 9k files of 4mb each.
 Spark UI showed the job being complete after ~2 hours. The last four hours
 of the job was just moving the files from _temporary to their final
 destination. The tasks for the write were definitely shown as complete, no
 logging is happening on the master or workers. The last line of my java
 code logs, but the job sits there as the moving of files happens.

 On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 This renaming from _temporary to the final location is actually done by
 executors, in parallel, for saveAsTextFile. It should be performed by each
 task individually before it returns.

 I have seen an issue similar to what you mention dealing with Hive code
 which did the renaming serially on the driver, which is very slow for S3
 (and possibly Google Storage as well), as it actually copies the data
 rather than doing a metadata-only operation during rename. However, this
 should not be an issue in this case.

 Could you confirm how the moving is happening -- i.e., on the executors
 or the driver?

 On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote:

 We are running spark in Google Compute Engine using their One-Click
 Deploy.
 By doing so, we get their Google Cloud Storage connector for hadoop for
 free
 meaning we can specify gs:// paths for input and output.

 We have jobs that take a couple of hours, end up with ~9k partitions
 which
 means 9k output files. After the job is complete it then moves the
 output
 files from our $output_path/_temporary to $output_path. That process can
 take longer than the job itself depending on the circumstances. The job I
 mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
 files in 1.5 hours from _temporary to the final destination.

 Is there a solution to this besides reducing the number of partitions?
 Anyone else run into similar issues elsewhere? I don't remember this
 being
 an issue with Map Reduce jobs and hadoop, however, I probably wasn't
 tracking the transfer of the output files like I am with Spark.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






performance of saveAsTextFile moving files from _temporary

2015-01-27 Thread jwalton
We are running spark in Google Compute Engine using their One-Click Deploy.
By doing so, we get their Google Cloud Storage connector for hadoop for free
meaning we can specify gs:// paths for input and output.

We have jobs that take a couple of hours, end up with ~9k partitions which
means 9k output files. After the job is complete it then moves the output
files from our $output_path/_temporary to $output_path. That process can
take longer than the job itself depending on the circumstances. The job I
mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
files in 1.5 hours from _temporary to the final destination.

Is there a solution to this besides reducing the number of partitions?
Anyone else run into similar issues elsewhere? I don't remember this being
an issue with Map Reduce jobs and hadoop, however, I probably wasn't
tracking the transfer of the output files like I am with Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: performance of saveAsTextFile moving files from _temporary

2015-01-27 Thread Aaron Davidson
This renaming from _temporary to the final location is actually done by
executors, in parallel, for saveAsTextFile. It should be performed by each
task individually before it returns.

I have seen an issue similar to what you mention dealing with Hive code
which did the renaming serially on the driver, which is very slow for S3
(and possibly Google Storage as well), as it actually copies the data
rather than doing a metadata-only operation during rename. However, this
should not be an issue in this case.

Could you confirm how the moving is happening -- i.e., on the executors or
the driver?

On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote:

 We are running spark in Google Compute Engine using their One-Click Deploy.
 By doing so, we get their Google Cloud Storage connector for hadoop for
 free
 meaning we can specify gs:// paths for input and output.

 We have jobs that take a couple of hours, end up with ~9k partitions which
 means 9k output files. After the job is complete it then moves the output
 files from our $output_path/_temporary to $output_path. That process can
 take longer than the job itself depending on the circumstances. The job I
 mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
 files in 1.5 hours from _temporary to the final destination.

 Is there a solution to this besides reducing the number of partitions?
 Anyone else run into similar issues elsewhere? I don't remember this being
 an issue with Map Reduce jobs and hadoop, however, I probably wasn't
 tracking the transfer of the output files like I am with Spark.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




  1   2   3   >