[Spark Core] saveAsTextFile is unable to rename a directory using hadoop-azure NativeAzureFileSystem
Hello, I am trying to use the Spark rdd.saveAsTextFile function which calls the FileSystem.rename() under the hood. This errors out with “com.microsoft.azure.storage.StorageException: One of the request inputs is not valid” when using hadoop-azure NativeAzureFileSystem. I have written a small test program to rename a directory in Azure Blob Storage in Scala that replicates this issue. Here is my code - import java.net.URI import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import scala.util.control.NonFatal /** * A utility to test renaming a hadoop-azure path. */ object AzureRenameTester { def main(args: Array[String]): Unit = { if (args.isEmpty) { throw new IllegalArgumentException("The Azure Blob storage key must be provided!") } val key = args.head val hadoopConfig = new Configuration() hadoopConfig.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") hadoopConfig.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") hadoopConfig.set("fs.AbstractFileSystem.wasbs.Impl", "org.apache.hadoop.fs.azure.Wasbs") hadoopConfig.set("fs.azure.account.key..blob.core.windows.net", key) val input = new URI("wasbs://@.blob.core.windows.net/testing") val inputPath = new Path(input) val output = new URI("wasbs://@.blob.core.windows.net/testingRenamed") val outputPath = new Path(output) val hadoopFs = FileSystem.get(input, hadoopConfig) try { println(s"Renaming from $inputPath to $outputPath") hadoopFs.rename(inputPath, outputPath) } catch { case NonFatal(ex) => println(s"${ExceptionUtils.getMessage(ex)}") println(s"${ExceptionUtils.getRootCause(ex)}") throw ex } } } This code leads to the following error - [error] Exception in thread "main" org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid. [error] at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2849) [error] at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2721) [error] at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:460) [error] at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:3277) [error] at com.qf.util.hdfs.AzureRenameTester$.main(AzureRenameTester.scala:40) [error] at com.qf.util.hdfs.AzureRenameTester.main(AzureRenameTester.scala) [error] Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid. [error] at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87) [error] at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315) [error] at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185) [error] at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:735) [error] at com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:691) [error] at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:434) [error] at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2788) [error] ... 5 more I am currently using spark-core-3.1.1.jar with hadoop-azure-3.2.2.jar but this same issue also occurs in hadoop-azure-3.3.1.jar as well. Please advise how I should solve this issue. Thanks, Abhishek
Spark saveAsTextFile Disk Recommendation
Hi Attila, I will check why INVALID is getting appended in mailing address. What is your use case here? Client Driver Application not using collect but internally calling python script which is reading part files records [comma separated string] of each cluster separately and copying records in other final csv file, so merging all part files data in single csv file. This script runs on every node and later they all combine to single file. *On the other hand is your data really just a collection of strings without any repetitions* [Ranju]: Yes It is comma separated string. And I just checked the 2nd argument of saveAsTextFile and I believe read and write will be faster on disk after use of compression. I will try this. So I think there is no special requirement on type of disk for execution of saveAsTextFile as they are local I/O operations. Regards Ranju Hi! I would like to reflect only to the first part of your mail: I have a large RDD dataset of around 60-70 GB which I cannot send to driver using *collect* so first writing that to disk using *saveAsTextFile* and then this data gets saved in the form of multiple part files on each node of the cluster and after that driver reads the data from that storage. What is your use case here? As you mention *collect()* I can assume you have to process the data outside of Spark maybe with a 3rd party tool, isn't it? If you have 60-70 GB of data and you write it to text file then read it back within the same application then you still cannot call *collect()* on it as it is still 60-70GB data, right? On the other hand is your data really just a collection of strings without any repetitions? I ask this because of the fileformat you are using: text file. Even for text file at least you can pass a compression codec as the 2nd argument of *saveAsTextFile()* <https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit> (when you use this link you might need to scroll up a little bit.. at least my chrome displays the the *saveAsTextFile* method without the 2nd arg codec). As IO is slow a compressed data could be read back quicker: as there will be less data in the disk. Check the Snappy <https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example. But if there is a structure of your data and you have plan to process this data further within Spark then please consider something way better: a columnar storage format namely ORC or Parquet. Best Regards, Attila *From:* Ranju Jain *Sent:* Sunday, March 21, 2021 8:10 AM *To:* user@spark.apache.org *Subject:* Spark saveAsTextFile Disk Recommendation Hi All, I have a large RDD dataset of around 60-70 GB which I cannot send to driver using *collect* so first writing that to disk using *saveAsTextFile* and then this data gets saved in the form of multiple part files on each node of the cluster and after that driver reads the data from that storage. I have a question like *spark.local.dir* is the directory which is used as a scratch space where mapoutputs files and RDDs might need to write by spark for shuffle operations etc. And there it is strongly recommended to use *local and fast disk *to avoid any failure or performance impact. *Do we have any such recommendation for storing multiple part files of large dataset [ or Big RDD ] in fast disk?* This will help me to configure the write type of disk for resulting part files. Regards Ranju
RE: Spark saveAsTextFile Disk Recommendation
Hi Attila, What is your use case here? Client Driver Application not using collect but internally calling python script which is reading part files records [comma separated string] of each cluster separately and copying records in other final csv file, so merging all part files data in single csv file. This script runs on every node and later they all combine to single file. On the other hand is your data really just a collection of strings without any repetitions [Ranju]: Yes It is comma separated string. And I just checked the 2nd argument of saveAsTextFile and I believe read and write will be faster on disk after use of compression. I will try this. So I think there is no special requirement on type of disk for execution of saveAsTextFile as they are local I/O operations. Regards Ranju Hi! I would like to reflect only to the first part of your mail: I have a large RDD dataset of around 60-70 GB which I cannot send to driver using collect so first writing that to disk using saveAsTextFile and then this data gets saved in the form of multiple part files on each node of the cluster and after that driver reads the data from that storage. What is your use case here? As you mention collect() I can assume you have to process the data outside of Spark maybe with a 3rd party tool, isn't it? If you have 60-70 GB of data and you write it to text file then read it back within the same application then you still cannot call collect() on it as it is still 60-70GB data, right? On the other hand is your data really just a collection of strings without any repetitions? I ask this because of the fileformat you are using: text file. Even for text file at least you can pass a compression codec as the 2nd argument of saveAsTextFile()<https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit> (when you use this link you might need to scroll up a little bit.. at least my chrome displays the the saveAsTextFile method without the 2nd arg codec). As IO is slow a compressed data could be read back quicker: as there will be less data in the disk. Check the Snappy<https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example. But if there is a structure of your data and you have plan to process this data further within Spark then please consider something way better: a columnar storage format namely ORC or Parquet. Best Regards, Attila From: Ranju Jain Sent: Sunday, March 21, 2021 8:10 AM To: user@spark.apache.org Subject: Spark saveAsTextFile Disk Recommendation Hi All, I have a large RDD dataset of around 60-70 GB which I cannot send to driver using collect so first writing that to disk using saveAsTextFile and then this data gets saved in the form of multiple part files on each node of the cluster and after that driver reads the data from that storage. I have a question like spark.local.dir is the directory which is used as a scratch space where mapoutputs files and RDDs might need to write by spark for shuffle operations etc. And there it is strongly recommended to use local and fast disk to avoid any failure or performance impact. Do we have any such recommendation for storing multiple part files of large dataset [ or Big RDD ] in fast disk? This will help me to configure the write type of disk for resulting part files. Regards Ranju
Re: Spark saveAsTextFile Disk Recommendation
Hi! I would like to reflect only to the first part of your mail: I have a large RDD dataset of around 60-70 GB which I cannot send to driver > using *collect* so first writing that to disk using *saveAsTextFile* and > then this data gets saved in the form of multiple part files on each node > of the cluster and after that driver reads the data from that storage. What is your use case here? As you mention *collect()* I can assume you have to process the data outside of Spark maybe with a 3rd party tool, isn't it? If you have 60-70 GB of data and you write it to text file then read it back within the same application then you still cannot call *collect()* on it as it is still 60-70GB data, right? On the other hand is your data really just a collection of strings without any repetitions? I ask this because of the fileformat you are using: text file. Even for text file at least you can pass a compression codec as the 2nd argument of *saveAsTextFile()* <https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit> (when you use this link you might need to scroll up a little bit.. at least my chrome displays the the *saveAsTextFile* method without the 2nd arg codec). As IO is slow a compressed data could be read back quicker: as there will be less data in the disk. Check the Snappy <https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example. But if there is a structure of your data and you have plan to process this data further within Spark then please consider something way better: a columnar storage format namely ORC or Parquet. Best Regards, Attila On Sun, Mar 21, 2021 at 3:40 AM Ranju Jain wrote: > Hi All, > > > > I have a large RDD dataset of around 60-70 GB which I cannot send to > driver using *collect* so first writing that to disk using > *saveAsTextFile* and then this data gets saved in the form of multiple > part files on each node of the cluster and after that driver reads the data > from that storage. > > > > I have a question like *spark.local.dir* is the directory which is used > as a scratch space where mapoutputs files and RDDs might need to write by > spark for shuffle operations etc. > > And there it is strongly recommended to use *local and fast disk *to > avoid any failure or performance impact. > > > > *Do we have any such recommendation for storing multiple part files of > large dataset [ or Big RDD ] in fast disk?* > > This will help me to configure the write type of disk for resulting part > files. > > > > Regards > > Ranju >
Spark saveAsTextFile Disk Recommendation
Hi All, I have a large RDD dataset of around 60-70 GB which I cannot send to driver using collect so first writing that to disk using saveAsTextFile and then this data gets saved in the form of multiple part files on each node of the cluster and after that driver reads the data from that storage. I have a question like spark.local.dir is the directory which is used as a scratch space where mapoutputs files and RDDs might need to write by spark for shuffle operations etc. And there it is strongly recommended to use local and fast disk to avoid any failure or performance impact. Do we have any such recommendation for storing multiple part files of large dataset [ or Big RDD ] in fast disk? This will help me to configure the write type of disk for resulting part files. Regards Ranju
Re: How to improve performance of saveAsTextFile()
How about increasing RDD's partitions / rebalancing data? On Sat, Mar 11, 2017 at 2:33 PM, Parsian, Mahmoudwrote: > How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“). > This is taking over 30 minutes on a cluster of 10 nodes. > Running Spark on YARN. > > JavaRDD has 120 million entries. > > Thank you, > Best regards, > Mahmoud >
How to improve performance of saveAsTextFile()
How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“). This is taking over 30 minutes on a cluster of 10 nodes. Running Spark on YARN. JavaRDD has 120 million entries. Thank you, Best regards, Mahmoud
Re: saveAsTextFile at treeEnsembleModels.scala:447, took 2.513396 s Killed
Hi, Thanks for your reply. permissions (access) is not an issue in my case, it is because this issue only happened when the bigger input file was used to generate the model, i.e. with smaller input(s) all worked well. It seems to me that ".save" cannot save big file. Q1: Any idea about the size limit that ".save" can handle? Q2: Any idea about how to check the size model that will be saved vis ".save" ? Regards On Thu, Jul 28, 2016 at 4:19 PM, Spico Florin <spicoflo...@gmail.com> wrote: > Hi! > There are many reasons that your task is failed. One could be that you > don't have proper permissions (access) to hdfs with your user. Please > check your user rights to write in hdfs. Please have a look also : > > http://stackoverflow.com/questions/27427042/spark-unable-to-save-in-hadoop-permission-denied-for-user > I hope it jelps. > Florin > > > On Thu, Jul 28, 2016 at 3:49 AM, Ascot Moss <ascot.m...@gmail.com> wrote: > >> >> Hi, >> >> Please help! >> >> When saving the model, I got following error and cannot save the model to >> hdfs: >> >> (my source code, my spark is v1.6.2) >> my_model.save(sc, "/my_model") >> >> - >> 16/07/28 08:36:19 INFO TaskSchedulerImpl: Removed TaskSet 69.0, whose >> tasks have all completed, from pool >> >> 16/07/28 08:36:19 INFO DAGScheduler: ResultStage 69 (saveAsTextFile at >> treeEnsembleModels.scala:447) finished in 0.901 s >> >> 16/07/28 08:36:19 INFO DAGScheduler: Job 38 finished: saveAsTextFile at >> treeEnsembleModels.scala:447, took 2.513396 s >> >> Killed >> - >> >> >> Q1: Is there any limitation on saveAsTextFile? >> Q2: or where to find the error log file location? >> >> Regards >> >> >> >> >> >
saveAsTextFile at treeEnsembleModels.scala:447, took 2.513396 s Killed
Hi, Please help! When saving the model, I got following error and cannot save the model to hdfs: (my source code, my spark is v1.6.2) my_model.save(sc, "/my_model") - 16/07/28 08:36:19 INFO TaskSchedulerImpl: Removed TaskSet 69.0, whose tasks have all completed, from pool 16/07/28 08:36:19 INFO DAGScheduler: ResultStage 69 (saveAsTextFile at treeEnsembleModels.scala:447) finished in 0.901 s 16/07/28 08:36:19 INFO DAGScheduler: Job 38 finished: saveAsTextFile at treeEnsembleModels.scala:447, took 2.513396 s Killed - Q1: Is there any limitation on saveAsTextFile? Q2: or where to find the error log file location? Regards
Re: problem about RDD map and then saveAsTextFile
Internally, saveAsTextFile uses saveAsHadoopFile: https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala . The final bit in the method first creates the output path and then saves the data set. However, if there is an issue with the saveAsHadoopDataset call, the path still remains. Technically, we could add an exception-handling section that removes the path in case of problems. I think that would be a nice way of making sure that we don’t litter the FS with empty files and directories in case of exceptions. So, to your question: parameter to saveAsTextFile is a path (not a file) and it has to be empty. Spark automatically names the files PART-N with N the partition number. This follows immediately from the partitioning scheme of the RDD itself. The real problem is that there is a problem with the calculation. You might want to fix that first. Just post the relevant bits from the log. Hi all: I’ve tried to execute something as below: result.map(transform).saveAsTextFile(hdfsAddress) Result is a RDD caluculated from mlilib algorithm. I submit this to yarn, and after two attempts , the application failed. But the exception in log is very missleading. It said hdfsAddress already exits. Actually, the first attempt log showed that the exception is from the calculation of result. Though the attempt failed it created the file. And then attempt 2 began with exception ‘file already exists’. Why was RDD calculation before already failed but also the file created? That’s not so good I think.
problem about RDD map and then saveAsTextFile
Hi all: I’ve tried to execute something as below: result.map(transform).saveAsTextFile(hdfsAddress) Result is a RDD caluculated from mlilib algorithm. I submit this to yarn, and after two attempts , the application failed. But the exception in log is very missleading. It said hdfsAddress already exits. Actually, the first attempt log showed that the exception is from the calculation of result. Though the attempt failed it created the file. And then attempt 2 began with exception ‘file already exists’. Why was RDD calculation before already failed but also the file created? That’s not so good I think.
RE: saveAsTextFile is not writing to local fs
If the data is not too big, one option is to call the collect method and then save the result to a local file using standard Java/Scala API. However, keep in mind that this will transfer data from all the worker nodes to the driver program. Looks like that is what you want to do anyway, but you need to be aware of how big that data is and related implications. Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Siva [mailto:sbhavan...@gmail.com] Sent: Monday, February 1, 2016 6:00 PM To: Mohammed Guller Cc: spark users Subject: Re: saveAsTextFile is not writing to local fs Hi Mohamed, Thanks for your response. Data is available in worker nodes. But looking for something to write directly to local fs. Seems like it is not an option. Thanks, Sivakumar Bhavanari. On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller <moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote: You should not be saving an RDD to local FS if Spark is running on a real cluster. Essentially, each Spark worker will save the partitions that it processes locally. Check the directories on the worker nodes and you should find pieces of your file on each node. Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>] Sent: Friday, January 29, 2016 5:40 PM To: Mohammed Guller Cc: spark users Subject: Re: saveAsTextFile is not writing to local fs Hi Mohammed, Thanks for your quick response. I m submitting spark job to Yarn in "yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. I see the below exception, but this exception occurred after saveAsTextfile function is finished. 16/01/29 20:26:57 DEBUG HttpParser: java.net.SocketException: Socket closed at java.net.SocketInputStream.read(SocketInputStream.java:190) at java.net.SocketInputStream.read(SocketInputStream.java:122) at org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) at org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) at org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) at org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) 16/01/29 20:26:57 DEBUG HttpParser: java.net.SocketException: Socket closed at java.net.SocketInputStream.read(SocketInputStream.java:190) at java.net.SocketInputStream.read(SocketInputStream.java:122) at org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) at org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) at org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) at org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) 16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3} org.spark-project.jetty.io.EofException Do you think this one this causing this? Thanks, Sivakumar Bhavanari. On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller <moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote: Is it a multi-node cluster or you running Spark on a single machine? You can change Spark’s logging level to INFO or DEBUG to see what is going on. Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>] Sent: Friday, January 29, 201
RE: saveAsTextFile is not writing to local fs
You should not be saving an RDD to local FS if Spark is running on a real cluster. Essentially, each Spark worker will save the partitions that it processes locally. Check the directories on the worker nodes and you should find pieces of your file on each node. Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Siva [mailto:sbhavan...@gmail.com] Sent: Friday, January 29, 2016 5:40 PM To: Mohammed Guller Cc: spark users Subject: Re: saveAsTextFile is not writing to local fs Hi Mohammed, Thanks for your quick response. I m submitting spark job to Yarn in "yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. I see the below exception, but this exception occurred after saveAsTextfile function is finished. 16/01/29 20:26:57 DEBUG HttpParser: java.net.SocketException: Socket closed at java.net.SocketInputStream.read(SocketInputStream.java:190) at java.net.SocketInputStream.read(SocketInputStream.java:122) at org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) at org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) at org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) at org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) 16/01/29 20:26:57 DEBUG HttpParser: java.net.SocketException: Socket closed at java.net.SocketInputStream.read(SocketInputStream.java:190) at java.net.SocketInputStream.read(SocketInputStream.java:122) at org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) at org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) at org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) at org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) 16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3} org.spark-project.jetty.io.EofException Do you think this one this causing this? Thanks, Sivakumar Bhavanari. On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller <moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote: Is it a multi-node cluster or you running Spark on a single machine? You can change Spark’s logging level to INFO or DEBUG to see what is going on. Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>] Sent: Friday, January 29, 2016 3:38 PM To: spark users Subject: saveAsTextFile is not writing to local fs Hi Everyone, We are using spark 1.4.1 and we have a requirement of writing data local fs instead of hdfs. When trying to save rdd to local fs with saveAsTextFile, it is just writing _SUCCESS file in the folder with no part- files and also no error or warning messages on console. Is there any place to look at to fix this problem? Thanks, Sivakumar Bhavanari.
Re: saveAsTextFile is not writing to local fs
Hi Mohamed, Thanks for your response. Data is available in worker nodes. But looking for something to write directly to local fs. Seems like it is not an option. Thanks, Sivakumar Bhavanari. On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller <moham...@glassbeam.com> wrote: > You should not be saving an RDD to local FS if Spark is running on a real > cluster. Essentially, each Spark worker will save the partitions that it > processes locally. > > > > Check the directories on the worker nodes and you should find pieces of > your file on each node. > > > > Mohammed > > Author: Big Data Analytics with Spark > <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> > > > > *From:* Siva [mailto:sbhavan...@gmail.com] > *Sent:* Friday, January 29, 2016 5:40 PM > *To:* Mohammed Guller > *Cc:* spark users > *Subject:* Re: saveAsTextFile is not writing to local fs > > > > Hi Mohammed, > > > > Thanks for your quick response. I m submitting spark job to Yarn in > "yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG > mode. I see the below exception, but this exception occurred after > saveAsTextfile function is finished. > > > > 16/01/29 20:26:57 DEBUG HttpParser: > > java.net.SocketException: Socket closed > > at java.net.SocketInputStream.read(SocketInputStream.java:190) > > at java.net.SocketInputStream.read(SocketInputStream.java:122) > > at > org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) > > at > org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) > > at > org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) > > at > org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) > > at > org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) > > at > org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) > > at > org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) > > at > org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) > > at > org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) > > at > org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) > > at java.lang.Thread.run(Thread.java:745) > > 16/01/29 20:26:57 DEBUG HttpParser: > > java.net.SocketException: Socket closed > > at java.net.SocketInputStream.read(SocketInputStream.java:190) > > at java.net.SocketInputStream.read(SocketInputStream.java:122) > > at > org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) > > at > org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) > > at > org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) > > at > org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) > > at > org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) > > at > org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) > > at > org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) > > at > org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) > > at > org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) > > at > org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) > > at java.lang.Thread.run(Thread.java:745) > > 16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3} > > org.spark-project.jetty.io.EofException > > > > Do you think this one this causing this? > > > Thanks, > > Sivakumar Bhavanari. > > > > On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller <moham...@glassbeam.com> > wrote: > > Is it a multi-node cluster or you running Spark on a single machine? > > > > You can change Spark’s logging level to INFO or DEBUG to see what is going > on. > > > > Mohammed > > Author: Big Data Analytics with Spark > <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> > > > > *From:* Siva [mailto:sbhavan...@gmail.com] > *Sent:* Friday, January 29, 2016 3:38 PM > *To:* spark users > *Subject:* saveAsTextFile is not writing to local fs > > > > Hi Everyone, > > > > We are using spark 1.4.1 and we have a requirement of writing data local > fs instead of hdfs. > > > > When trying to save rdd to local fs with saveAsTextFile, it is just > writing _SUCCESS file in the folder with no part- files and also no error > or warning messages on console. > > > > Is there any place to look at to fix this problem? > > > Thanks, > > Sivakumar Bhavanari. > > >
saveAsTextFile is not writing to local fs
Hi Everyone, We are using spark 1.4.1 and we have a requirement of writing data local fs instead of hdfs. When trying to save rdd to local fs with saveAsTextFile, it is just writing _SUCCESS file in the folder with no part- files and also no error or warning messages on console. Is there any place to look at to fix this problem? Thanks, Sivakumar Bhavanari.
RE: saveAsTextFile is not writing to local fs
Is it a multi-node cluster or you running Spark on a single machine? You can change Spark’s logging level to INFO or DEBUG to see what is going on. Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Siva [mailto:sbhavan...@gmail.com] Sent: Friday, January 29, 2016 3:38 PM To: spark users Subject: saveAsTextFile is not writing to local fs Hi Everyone, We are using spark 1.4.1 and we have a requirement of writing data local fs instead of hdfs. When trying to save rdd to local fs with saveAsTextFile, it is just writing _SUCCESS file in the folder with no part- files and also no error or warning messages on console. Is there any place to look at to fix this problem? Thanks, Sivakumar Bhavanari.
Re: saveAsTextFile is not writing to local fs
Hi Mohammed, Thanks for your quick response. I m submitting spark job to Yarn in "yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. I see the below exception, but this exception occurred after saveAsTextfile function is finished. 16/01/29 20:26:57 DEBUG HttpParser: java.net.SocketException: Socket closed at java.net.SocketInputStream.read(SocketInputStream.java:190) at java.net.SocketInputStream.read(SocketInputStream.java:122) at org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) at org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) at org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) at org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) 16/01/29 20:26:57 DEBUG HttpParser: java.net.SocketException: Socket closed at java.net.SocketInputStream.read(SocketInputStream.java:190) at java.net.SocketInputStream.read(SocketInputStream.java:122) at org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) at org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044) at org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280) at org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) at org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) 16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3} org.spark-project.jetty.io.EofException Do you think this one this causing this? Thanks, Sivakumar Bhavanari. On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller <moham...@glassbeam.com> wrote: > Is it a multi-node cluster or you running Spark on a single machine? > > > > You can change Spark’s logging level to INFO or DEBUG to see what is going > on. > > > > Mohammed > > Author: Big Data Analytics with Spark > <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> > > > > *From:* Siva [mailto:sbhavan...@gmail.com] > *Sent:* Friday, January 29, 2016 3:38 PM > *To:* spark users > *Subject:* saveAsTextFile is not writing to local fs > > > > Hi Everyone, > > > > We are using spark 1.4.1 and we have a requirement of writing data local > fs instead of hdfs. > > > > When trying to save rdd to local fs with saveAsTextFile, it is just > writing _SUCCESS file in the folder with no part- files and also no error > or warning messages on console. > > > > Is there any place to look at to fix this problem? > > > Thanks, > > Sivakumar Bhavanari. >
Re: coalesce(1).saveAsTextfile() takes forever?
another option will be to try rdd.toLocalIterator() not sure if it will help though I had same problem and ended up to move all parts to local disk(with Hadoop FileSystem api) and then processing them locally On 5 January 2016 at 22:08, Alexander Pivovarov <apivova...@gmail.com> wrote: > try coalesce(1, true). > > On Tue, Jan 5, 2016 at 11:58 AM, unk1102 <umesh.ka...@gmail.com> wrote: > >> hi I am trying to save many partitions of Dataframe into one CSV file and >> it >> take forever for large data sets of around 5-6 GB. >> >> >> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop") >> >> For small data above code works well but for large data it hangs forever >> does not move on because of only one partitions has to shuffle data of GBs >> please help me >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
coalesce(1).saveAsTextfile() takes forever?
hi I am trying to save many partitions of Dataframe into one CSV file and it take forever for large data sets of around 5-6 GB. sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop") For small data above code works well but for large data it hangs forever does not move on because of only one partitions has to shuffle data of GBs please help me -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: coalesce(1).saveAsTextfile() takes forever?
try coalesce(1, true). On Tue, Jan 5, 2016 at 11:58 AM, unk1102 <umesh.ka...@gmail.com> wrote: > hi I am trying to save many partitions of Dataframe into one CSV file and > it > take forever for large data sets of around 5-6 GB. > > > sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop") > > For small data above code works well but for large data it hangs forever > does not move on because of only one partitions has to shuffle data of GBs > please help me > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: coalesce(1).saveAsTextfile() takes forever?
Hi dataframe has not boolean option for coalesce it is only for RDD I believe sourceFrame.coalesce(1,true) //gives compilation error On Wed, Jan 6, 2016 at 1:38 AM, Alexander Pivovarov <apivova...@gmail.com> wrote: > try coalesce(1, true). > > On Tue, Jan 5, 2016 at 11:58 AM, unk1102 <umesh.ka...@gmail.com> wrote: > >> hi I am trying to save many partitions of Dataframe into one CSV file and >> it >> take forever for large data sets of around 5-6 GB. >> >> >> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop") >> >> For small data above code works well but for large data it hangs forever >> does not move on because of only one partitions has to shuffle data of GBs >> please help me >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: coalesce(1).saveAsTextfile() takes forever?
Hi Unk1102 I also had trouble when I used coalesce(). Reparation() worked much better. Keep in mind if you have a large number of portions you are probably going have high communication costs. Also my code works a lot better on 1.6.0. DataFrame memory was not be spilled in 1.5.2. In 1.6.0 unpersist() actually frees up memory Another strange thing I noticed in 1.5.1 was that I had thousands of partitions. Many of them where empty. Have lots of empty partitions really slowed things down Andy From: unk1102 <umesh.ka...@gmail.com> Date: Tuesday, January 5, 2016 at 11:58 AM To: "user @spark" <user@spark.apache.org> Subject: coalesce(1).saveAsTextfile() takes forever? > hi I am trying to save many partitions of Dataframe into one CSV file and it > take forever for large data sets of around 5-6 GB. > > sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzi > p").save("/path/hadoop") > > For small data above code works well but for large data it hangs forever > does not move on because of only one partitions has to shuffle data of GBs > please help me > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile- > takes-forever-tp25886.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Improve saveAsTextFile performance
>If you are doing a join/groupBy kind of operations then you need to make sure >the keys are evenly distributed throughout the partitions. Yes I am doing join/groupBy operations.Can you point me to docs on how to do this? Spark 1.5.2 First attempt Aggregated Metrics by Executor Executor ID Address Task Time ▾ Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size / Records Shuffle Spill (Memory) Shuffle Spill (Disk) 32 rc-spark-poc-w-3.c.dailymotion-data.internal:51748 1.2 h 18 0 18 4.4 MB / 167812 51.5 GB / 128713153.1 GB51.1 GB Second Attempt Aggregated Metrics by Executor Executor ID Address Task Time ▾ Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Size / Records 5 rc-spark-poc-w-1.c.dailymotion-data.internal:41061 47 min 8 0 8 3.9 MB / 95334 Best Regards, Ram From: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> Date: Saturday, December 5, 2015 at 1:32 AM To: Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Improve saveAsTextFile performance Which version of spark are you using? Can you look at the event timeline and the DAG of the job and see where its spending more time? .save simply triggers your entire pipeline, If you are doing a join/groupBy kind of operations then you need to make sure the keys are evenly distributed throughout the partitions. Thanks Best Regards On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> wrote: That didn’t work :( Any help I have documented some steps here. http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes Best Regards, Ram From: Sahil Sareen <sareen...@gmail.com<mailto:sareen...@gmail.com>> Date: Wednesday, December 2, 2015 at 10:18 PM To: Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> Cc: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>, user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Improve saveAsTextFile performance http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per
Re: Improve saveAsTextFile performance
I tried partitionBy with a Hashpartitioner still the same issue groupBy Operation: https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L51 Join Operation: https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L80 Best Regards, Ram -- Date: Saturday, December 5, 2015 at 7:18 AM To: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Improve saveAsTextFile performance >If you are doing a join/groupBy kind of operations then you need to make sure >the keys are evenly distributed throughout the partitions. Yes I am doing join/groupBy operations.Can you point me to docs on how to do this? Spark 1.5.2 First attempt Aggregated Metrics by Executor Executor ID Address Task Time ▾ Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size / Records Shuffle Spill (Memory) Shuffle Spill (Disk) 32 rc-spark-poc-w-3.c.dailymotion-data.internal:51748 1.2 h 18 0 18 4.4 MB / 167812 51.5 GB / 128713153.1 GB51.1 GB Second Attempt Aggregated Metrics by Executor Executor ID Address Task Time ▾ Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Size / Records 5 rc-spark-poc-w-1.c.dailymotion-data.internal:41061 47 min 8 0 8 3.9 MB / 95334 Best Regards, Ram From: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> Date: Saturday, December 5, 2015 at 1:32 AM To: Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Improve saveAsTextFile performance Which version of spark are you using? Can you look at the event timeline and the DAG of the job and see where its spending more time? .save simply triggers your entire pipeline, If you are doing a join/groupBy kind of operations then you need to make sure the keys are evenly distributed throughout the partitions. Thanks Best Regards On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> wrote: That didn’t work :( Any help I have documented some steps here. http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes Best Regards, Ram From: Sahil Sareen <sareen...@gmail.com<mailto:sareen...@gmail.com>> Date: Wednesday, December 2, 2015 at 10:18 PM To: Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> Cc: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>, user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Improve saveAsTextFile performance http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per
Re: Improve saveAsTextFile performance
Which version of spark are you using? Can you look at the event timeline and the DAG of the job and see where its spending more time? .save simply triggers your entire pipeline, If you are doing a join/groupBy kind of operations then you need to make sure the keys are evenly distributed throughout the partitions. Thanks Best Regards On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA < ram.viswana...@dailymotion.com> wrote: > That didn’t work :( > Any help I have documented some steps here. > > http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes > > Best Regards, > Ram > > From: Sahil Sareen <sareen...@gmail.com> > Date: Wednesday, December 2, 2015 at 10:18 PM > To: Ram VISWANADHA <ram.viswana...@dailymotion.com> > Cc: Ted Yu <yuzhih...@gmail.com>, user <user@spark.apache.org> > Subject: Re: Improve saveAsTextFile performance > > > http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per >
Re: Improve saveAsTextFile performance
That didn’t work :( Any help I have documented some steps here. http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes Best Regards, Ram From: Sahil Sareen <sareen...@gmail.com<mailto:sareen...@gmail.com>> Date: Wednesday, December 2, 2015 at 10:18 PM To: Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> Cc: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>, user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Improve saveAsTextFile performance http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per
Re: Improve saveAsTextFile performance
PTAL: http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per -Sahil On Thu, Dec 3, 2015 at 9:18 AM, Ram VISWANADHA < ram.viswana...@dailymotion.com> wrote: > Yes. That did not help. > > Best Regards, > Ram > From: Ted Yu <yuzhih...@gmail.com> > Date: Wednesday, December 2, 2015 at 3:25 PM > To: Ram VISWANADHA <ram.viswana...@dailymotion.com> > Cc: user <user@spark.apache.org> > Subject: Re: Improve saveAsTextFile performance > > Have you tried calling coalesce() before saveAsTextFile ? > > Cheers > > On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA < > ram.viswana...@dailymotion.com> wrote: > >> JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 >> tasks, the first 9 complete in a reasonable time but the last task is >> taking a long time to complete. The last task contains the maximum number >> of records like 90% of the total number of records. Is there any way to >> parallelize the execution by increasing the number of tasks or evenly >> distributing the number of records to different tasks? >> >> Thanks in advance. >> >> Best Regards, >> Ram >> > >
Re: Improve saveAsTextFile performance
Yes. That did not help. Best Regards, Ram From: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> Date: Wednesday, December 2, 2015 at 3:25 PM To: Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Improve saveAsTextFile performance Have you tried calling coalesce() before saveAsTextFile ? Cheers On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA <ram.viswana...@dailymotion.com<mailto:ram.viswana...@dailymotion.com>> wrote: JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 tasks, the first 9 complete in a reasonable time but the last task is taking a long time to complete. The last task contains the maximum number of records like 90% of the total number of records. Is there any way to parallelize the execution by increasing the number of tasks or evenly distributing the number of records to different tasks? Thanks in advance. Best Regards, Ram
Improve saveAsTextFile performance
JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 tasks, the first 9 complete in a reasonable time but the last task is taking a long time to complete. The last task contains the maximum number of records like 90% of the total number of records. Is there any way to parallelize the execution by increasing the number of tasks or evenly distributing the number of records to different tasks? Thanks in advance. Best Regards, Ram
Re: Improve saveAsTextFile performance
Have you tried calling coalesce() before saveAsTextFile ? Cheers On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA < ram.viswana...@dailymotion.com> wrote: > JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 > tasks, the first 9 complete in a reasonable time but the last task is > taking a long time to complete. The last task contains the maximum number > of records like 90% of the total number of records. Is there any way to > parallelize the execution by increasing the number of tasks or evenly > distributing the number of records to different tasks? > > Thanks in advance. > > Best Regards, > Ram >
Re: streaming: missing data. does saveAsTextFile() append or replace?
Thank Gerard I¹ll give that a try. It seems like this approach is going to create a very large number of files. I guess I could write a cron job to concatenate the files by hour or maybe days. I imagine this is a common problem. Do you know of something that does this already ? I am using the stand alone cluster manager. I do not think it directly supports cron job/table functionality. It should be easy to use the hdfs api and linux crontab or may https://quartz-scheduler.org/ Kind regards andy From: Gerard Maas <gerard.m...@gmail.com> Date: Sunday, November 8, 2015 at 2:13 AM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: streaming: missing data. does saveAsTextFile() append or replace? > Andy, > > Using the rdd.saveAsTextFile(...) will overwrite the data if your target is > the same file. > > If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix, > suffix) where a new file will be written at each streaming interval. > Note that this will result in a saved file for each streaming interval. If you > want to increase the file size (usually a good idea in HDFS), you can use a > window function over the dstream and save the 'windowed' dstream instead. > > kind regards, Gerard. > > On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> Hi >> >> I just started a new spark streaming project. In this phase of the system all >> we want to do is save the data we received to hdfs. I after running for a >> couple of days it looks like I am missing a lot of data. I wonder if >> saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture >> in previous window? I noticed that after running for a couple of days my >> hdfs file system has 25 file. The names are something like ³part-6². I >> used 'hadoop fs dus¹ to check the total data captured. While the system was >> running I would periodically call dus¹ I was surprised sometimes the numbers >> of total bytes actually dropped. >> >> >> Is there a better way to save write my data to disk? >> >> Any suggestions would be appreciated >> >> Andy >> >> >>public static void main(String[] args) { >> >> SparkConf conf = new SparkConf().setAppName(appName); >> >> JavaSparkContext jsc = new JavaSparkContext(conf); >> >> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new >> Duration(5 * 1000)); >> >> >> >> [ deleted code ] >> >> >> >> data.foreachRDD(new Function<JavaRDD, Void>(){ >> >> private static final long serialVersionUID = >> -7957854392903581284L; >> >> >> >> @Override >> >> public Void call(JavaRDD jsonStr) throws Exception { >> >> jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); // >> /rawSteamingData is a directory >> >> return null; >> >> } >> >> }); >> >> >> >> ssc.checkpoint(checkPointUri); >> >> >> >> ssc.start(); >> >> ssc.awaitTermination(); >> >> } >
Re: streaming: missing data. does saveAsTextFile() append or replace?
Andy, Using the rdd.saveAsTextFile(...) will overwrite the data if your target is the same file. If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix, suffix) where a new file will be written at each streaming interval. Note that this will result in a saved file for each streaming interval. If you want to increase the file size (usually a good idea in HDFS), you can use a window function over the dstream and save the 'windowed' dstream instead. kind regards, Gerard. On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson < a...@santacruzintegration.com> wrote: > Hi > > I just started a new spark streaming project. In this phase of the system > all we want to do is save the data we received to hdfs. I after running for > a couple of days it looks like I am missing a lot of data. I wonder if > saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I > capture in previous window? I noticed that after running for a couple of > days my hdfs file system has 25 file. The names are something like > “part-6”. I > used 'hadoop fs –dus’ to check the total data captured. While the system > was running I would periodically call ‘dus’ I was surprised sometimes the > numbers of total bytes actually dropped. > > > Is there a better way to save write my data to disk? > > Any suggestions would be appreciated > > Andy > > >public static void main(String[] args) { > >SparkConf conf = new SparkConf().setAppName(appName); > > JavaSparkContext jsc = new JavaSparkContext(conf); > > JavaStreamingContext ssc = new JavaStreamingContext(jsc, new > Duration(5 * 1000)); > > > [ deleted code …] > > > data.foreachRDD(new Function<JavaRDD, Void>(){ > > private static final long serialVersionUID = > -7957854392903581284L; > > > @Override > > public Void call(JavaRDD jsonStr) throws Exception { > > jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // > /rawSteamingData > is a directory > > return null; > > } > > }); > > > > ssc.checkpoint(checkPointUri); > > > > ssc.start(); > > ssc.awaitTermination(); > > } >
streaming: missing data. does saveAsTextFile() append or replace?
Hi I just started a new spark streaming project. In this phase of the system all we want to do is save the data we received to hdfs. I after running for a couple of days it looks like I am missing a lot of data. I wonder if saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture in previous window? I noticed that after running for a couple of days my hdfs file system has 25 file. The names are something like ³part-6². I used 'hadoop fs dus¹ to check the total data captured. While the system was running I would periodically call dus¹ I was surprised sometimes the numbers of total bytes actually dropped. Is there a better way to save write my data to disk? Any suggestions would be appreciated Andy public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(appName); JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5 * 1000)); [ deleted code ] data.foreachRDD(new Function<JavaRDD, Void>(){ private static final long serialVersionUID = -7957854392903581284L; @Override public Void call(JavaRDD jsonStr) throws Exception { jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); // /rawSteamingData is a directory return null; } }); ssc.checkpoint(checkPointUri); ssc.start(); ssc.awaitTermination(); }
error with saveAsTextFile in local directory
Hi all, I am saving some hive- query results into the local directory: val hdfsFilePath = "hdfs://master:ip/ tempFile "; val localFilePath = "file:///home/hduser/tempFile"; hiveContext.sql(s"""my hql codes here""") res.printSchema() --working res.show() --working res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath) --still working res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath) --wrong! then at last, I get the correct results in hdfsFilePath, but nothing in localFilePath. Btw, the localFilePath was created, but the folder was only with a _SUCCESS file, no part file. See the track: (any thougt?) 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile at myApp.scala:112) with 1 output partitions (allowLocal=false) // the 112 line is the place I am using saveAsTextFile function to save the results locally. 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage 42(saveAsTextFile at MyApp.scala:112) 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 41) 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List() 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no missing parents 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called with curMem=3889533, maxMem=280248975 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as values in memory (estimated size 156.9 KB, free 263.4 MB) 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called with curMem=4050165, maxMem=280248975 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0 stored as bytes in memory (estimated size 54.8 KB, free 263.4 MB) 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in memory on 192.168.70.135:32836 (size: 54.8 KB, free: 266.8 MB) 15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from broadcast at DAGScheduler.scala:874 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112) 15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0 with 1 tasks 15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes) 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in memory on 192.168.70.129:54062 (size: 54.8 KB, free: 1068.8 MB) 15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1) 15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0, whose tasks have all completed, from pool 15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42 (saveAsTextFile at MyApp.scala:112) finished in 6.360 s 15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished: saveAsTextFile at MyApp.scala:112, took 6.588821 s 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 15/11/04 09:57:47 INFO handler.ContextHandler: s
Re: error with saveAsTextFile in local directory
Looks like you were running 1.4.x or earlier release because the allowLocal flag is deprecated as of Spark 1.5.0+. Cheers On Tue, Nov 3, 2015 at 3:07 PM, Jack Yang <j...@uow.edu.au> wrote: > Hi all, > > > > I am saving some hive- query results into the local directory: > > > > val hdfsFilePath = "hdfs://master:ip/ tempFile "; > > val localFilePath = "file:///home/hduser/tempFile"; > > hiveContext.sql(s"""my hql codes here""") > > res.printSchema() --working > > res.show() --working > > res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath) > --still working > > res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath) > --wrong! > > > > then at last, I get the correct results in hdfsFilePath, but nothing in > localFilePath. > > Btw, the localFilePath was created, but the folder was only with a > _SUCCESS file, no part file. > > > > See the track: (any thougt?) > > > > 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile > at myApp.scala:*112*) with 1 output partitions (allowLocal=false) > > *// the 112 line is the place I am using saveAsTextFile function to save > the results locally.* > > > > 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage > 42(saveAsTextFile at MyApp.scala:112) > > 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage: > List(ShuffleMapStage 41) > > 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List() > > 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42 > (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no > missing parents > > 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called > with curMem=3889533, maxMem=280248975 > > 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as > values in memory (estimated size 156.9 KB, free 263.4 MB) > > 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called > with curMem=4050165, maxMem=280248975 > > 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0 > stored as bytes in memory (estimated size 54.8 KB, free 263.4 MB) > > 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 > in memory on 192.168.70.135:32836 (size: 54.8 KB, free: 266.8 MB) > > 15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from > broadcast at DAGScheduler.scala:874 > > 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks > from ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at > MyApp.scala:112) > > 15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0 > with 1 tasks > > 15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in > stage 42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes) > > 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 > in memory on 192.168.70.129:54062 (size: 54.8 KB, free: 1068.8 MB) > > 15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in > stage 42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1) > > 15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0, > whose tasks have all completed, from pool > > 15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42 > (saveAsTextFile at MyApp.scala:112) finished in 6.360 s > > 15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished: > saveAsTextFile at MyApp.scala:112, took 6.588821 s > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/metrics/json,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/stages/stage/kill,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/api,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/static,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors/threadDump,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors/json,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/executors,null} > > 15/11/04 09:57:47 INFO handler.ContextHandler: stopped > o.s.j.s.ServletContextHandler{/envir
RE: error with saveAsTextFile in local directory
Yes. My one is 1.4.0. Then is this problem to do with the version? I doubt that. Any comments please? From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, 4 November 2015 11:52 AM To: Jack Yang Cc: user@spark.apache.org Subject: Re: error with saveAsTextFile in local directory Looks like you were running 1.4.x or earlier release because the allowLocal flag is deprecated as of Spark 1.5.0+. Cheers On Tue, Nov 3, 2015 at 3:07 PM, Jack Yang <j...@uow.edu.au<mailto:j...@uow.edu.au>> wrote: Hi all, I am saving some hive- query results into the local directory: val hdfsFilePath = "hdfs://master:ip/ tempFile "; val localFilePath = "file:///home/hduser/tempFile"; hiveContext.sql(s"""my hql codes here""") res.printSchema() --working res.show() --working res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath) --still working res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath) --wrong! then at last, I get the correct results in hdfsFilePath, but nothing in localFilePath. Btw, the localFilePath was created, but the folder was only with a _SUCCESS file, no part file. See the track: (any thougt?) 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile at myApp.scala:112) with 1 output partitions (allowLocal=false) // the 112 line is the place I am using saveAsTextFile function to save the results locally. 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage 42(saveAsTextFile at MyApp.scala:112) 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 41) 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List() 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no missing parents 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called with curMem=3889533, maxMem=280248975 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as values in memory (estimated size 156.9 KB, free 263.4 MB) 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called with curMem=4050165, maxMem=280248975 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0 stored as bytes in memory (estimated size 54.8 KB, free 263.4 MB) 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in memory on 192.168.70.135:32836<http://192.168.70.135:32836> (size: 54.8 KB, free: 266.8 MB) 15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from broadcast at DAGScheduler.scala:874 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112) 15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0 with 1 tasks 15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes) 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in memory on 192.168.70.129:54062<http://192.168.70.129:54062> (size: 54.8 KB, free: 1068.8 MB) 15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1) 15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0, whose tasks have all completed, from pool 15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42 (saveAsTextFile at MyApp.scala:112) finished in 6.360 s 15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished: saveAsTextFile at MyApp.scala:112, took 6.588821 s 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 15/11/04 09:57:47 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandle
Re: saveAsTextFile creates an empty folder in HDFS
bq. val dist = sc.parallelize(l) Following the above, can you call, e.g. count() on dist before saving ? Cheers On Fri, Oct 2, 2015 at 1:21 AM, jarias <ja...@elrocin.es> wrote: > Dear list, > > I'm experimenting a problem when trying to write any RDD to HDFS. I've > tried > with minimal examples, scala programs and pyspark programs both in local > and > cluster modes and as standalone applications or shells. > > My problem is that when invoking the write command, a task is executed but > it just creates an empty folder in the given HDFS path. I'm lost at this > point because there is no sign of error or warning in the spark logs. > > I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is > working properly when using the command tools or running MapReduce jobs. > > > Thank you for your time, I'm not sure if this is just a rookie mistake or > an > overall config problem. > > Just a working example: > > This sequence produces the following log and creates the empty folder > "test": > > scala> val l = Seq.fill(1)(nextInt) > scala> val dist = sc.parallelize(l) > scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/") > > > 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm > version is 1 > 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at > :27 > 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at > :27) with 2 output partitions (allowLocal=false) > 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at > :27) > 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List() > 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List() > 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 > (MapPartitionsRDD[7] > at saveAsTextFile at :27), which has no missing parents > 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with > curMem=184615, maxMem=278302556 > 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in > memory (estimated size 134.1 KB, free 265.1 MB) > 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with > curMem=321951, maxMem=278302556 > 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as > bytes > in memory (estimated size 46.6 KB, free 265.1 MB) > 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB) > 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block > broadcast_3_piece0 > 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at > DAGScheduler.scala:839 > 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage > 3 > (MapPartitionsRDD[7] at saveAsTextFile at :27) > 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks > 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID > 6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes) > 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID > 7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes) > 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB) > 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB) > 15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID > 6) in 312 ms on nodo2.i3a.info (1/2) > 15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID > 7) in 313 ms on nodo3.i3a.info (2/2) > 15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have > all completed, from pool > 15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at > :27) finished in 0.334 s > 15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at > :27, took 0.436388 s > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-creates-an-empty-folder-in-HDFS-tp24906.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: saveAsTextFile creates an empty folder in HDFS
Yes printing the result with collect or take is working, actually this is a minimal example, but also when working with real data the actions are performed, and the resulting RDDs can be printed out without problem. The data is there and the operations are correct, they just cannot be written to a file. > On 03 Oct 2015, at 16:17, Ted Yu <yuzhih...@gmail.com > <mailto:yuzhih...@gmail.com>> wrote: > > bq. val dist = sc.parallelize(l) > > Following the above, can you call, e.g. count() on dist before saving ? > > Cheers > > On Fri, Oct 2, 2015 at 1:21 AM, jarias <ja...@elrocin.es > <mailto:ja...@elrocin.es>> wrote: > Dear list, > > I'm experimenting a problem when trying to write any RDD to HDFS. I've tried > with minimal examples, scala programs and pyspark programs both in local and > cluster modes and as standalone applications or shells. > > My problem is that when invoking the write command, a task is executed but > it just creates an empty folder in the given HDFS path. I'm lost at this > point because there is no sign of error or warning in the spark logs. > > I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is > working properly when using the command tools or running MapReduce jobs. > > > Thank you for your time, I'm not sure if this is just a rookie mistake or an > overall config problem. > > Just a working example: > > This sequence produces the following log and creates the empty folder > "test": > > scala> val l = Seq.fill(1)(nextInt) > scala> val dist = sc.parallelize(l) > scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/ > <http://node1.i3a.info/user/jarias/test/>") > > > 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm > version is 1 > 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at > :27 > 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at > :27) with 2 output partitions (allowLocal=false) > 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at > :27) > 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List() > 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List() > 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[7] > at saveAsTextFile at :27), which has no missing parents > 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with > curMem=184615, maxMem=278302556 > 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in > memory (estimated size 134.1 KB, free 265.1 MB) > 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with > curMem=321951, maxMem=278302556 > 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes > in memory (estimated size 46.6 KB, free 265.1 MB) > 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on nodo1.i3a.info:36330 <http://nodo1.i3a.info:36330/> (size: 46.6 KB, free: > 265.3 MB) > 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block > broadcast_3_piece0 > 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at > DAGScheduler.scala:839 > 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3 > (MapPartitionsRDD[7] at saveAsTextFile at :27) > 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks > 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID > 6, nodo2.i3a.info <http://nodo2.i3a.info/>, PROCESS_LOCAL, 25975 bytes) > 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID > 7, nodo3.i3a.info <http://nodo3.i3a.info/>, PROCESS_LOCAL, 25963 bytes) > 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on nodo2.i3a.info:37759 <http://nodo2.i3a.info:37759/> (size: 46.6 KB, free: > 530.2 MB) > 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on nodo3.i3a.info:54798 <http://nodo3.i3a.info:54798/> (size: 46.6 KB, free: > 530.2 MB) > 15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID > 6) in 312 ms on nodo2.i3a.info <http://nodo2.i3a.info/> (1/2) > 15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID > 7) in 313 ms on nodo3.i3a.info <http://nodo3.i3a.info/> (2/2) > 15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have > all completed, from pool > 15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at > :27) finished in 0.334 s > 15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at > :27, took 0.436388 s > >
Re: saveAsTextFile creates an empty folder in HDFS
Hi Jacin, If I was you, first thing that I would do is, write a sample java application to write data into hdfs and see if it's working fine. Meta data is being created in hdfs, that means, communication to namenode is working fine but not to datanodes since you don't see any data inside the file. Why don't you see hdfs logs and see what's happening when your application is talking to namenode? I suspect some networking issue or check if the datanodes are running fine. Thank you, Ajay On Saturday, October 3, 2015, Jacinto Arias <ja...@elrocin.es> wrote: > Yes printing the result with collect or take is working, > > actually this is a minimal example, but also when working with real data > the actions are performed, and the resulting RDDs can be printed out > without problem. The data is there and the operations are correct, they > just cannot be written to a file. > > > On 03 Oct 2015, at 16:17, Ted Yu <yuzhih...@gmail.com > <javascript:_e(%7B%7D,'cvml','yuzhih...@gmail.com');>> wrote: > > bq. val dist = sc.parallelize(l) > > Following the above, can you call, e.g. count() on dist before saving ? > > Cheers > > On Fri, Oct 2, 2015 at 1:21 AM, jarias <ja...@elrocin.es > <javascript:_e(%7B%7D,'cvml','ja...@elrocin.es');>> wrote: > >> Dear list, >> >> I'm experimenting a problem when trying to write any RDD to HDFS. I've >> tried >> with minimal examples, scala programs and pyspark programs both in local >> and >> cluster modes and as standalone applications or shells. >> >> My problem is that when invoking the write command, a task is executed but >> it just creates an empty folder in the given HDFS path. I'm lost at this >> point because there is no sign of error or warning in the spark logs. >> >> I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is >> working properly when using the command tools or running MapReduce jobs. >> >> >> Thank you for your time, I'm not sure if this is just a rookie mistake or >> an >> overall config problem. >> >> Just a working example: >> >> This sequence produces the following log and creates the empty folder >> "test": >> >> scala> val l = Seq.fill(1)(nextInt) >> scala> val dist = sc.parallelize(l) >> scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/") >> >> >> 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer >> Algorithm >> version is 1 >> 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at >> :27 >> 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at >> :27) with 2 output partitions (allowLocal=false) >> 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile >> at >> :27) >> 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List() >> 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List() >> 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 >> (MapPartitionsRDD[7] >> at saveAsTextFile at :27), which has no missing parents >> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with >> curMem=184615, maxMem=278302556 >> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in >> memory (estimated size 134.1 KB, free 265.1 MB) >> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with >> curMem=321951, maxMem=278302556 >> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as >> bytes >> in memory (estimated size 46.6 KB, free 265.1 MB) >> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in >> memory >> on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB) >> 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block >> broadcast_3_piece0 >> 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at >> DAGScheduler.scala:839 >> 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from >> Stage 3 >> (MapPartitionsRDD[7] at saveAsTextFile at :27) >> 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks >> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID >> 6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes) >> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID >> 7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes) >> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in >> memory >> on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB) >> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 i
saveAsTextFile creates an empty folder in HDFS
Dear list, I'm experimenting a problem when trying to write any RDD to HDFS. I've tried with minimal examples, scala programs and pyspark programs both in local and cluster modes and as standalone applications or shells. My problem is that when invoking the write command, a task is executed but it just creates an empty folder in the given HDFS path. I'm lost at this point because there is no sign of error or warning in the spark logs. I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is working properly when using the command tools or running MapReduce jobs. Thank you for your time, I'm not sure if this is just a rookie mistake or an overall config problem. Just a working example: This sequence produces the following log and creates the empty folder "test": scala> val l = Seq.fill(1)(nextInt) scala> val dist = sc.parallelize(l) scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/") 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at :27 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at :27) with 2 output partitions (allowLocal=false) 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at :27) 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List() 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List() 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[7] at saveAsTextFile at :27), which has no missing parents 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with curMem=184615, maxMem=278302556 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 134.1 KB, free 265.1 MB) 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with curMem=321951, maxMem=278302556 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 46.6 KB, free 265.1 MB) 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB) 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3 (MapPartitionsRDD[7] at saveAsTextFile at :27) 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes) 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes) 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB) 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB) 15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 6) in 312 ms on nodo2.i3a.info (1/2) 15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7) in 313 ms on nodo3.i3a.info (2/2) 15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at :27) finished in 0.334 s 15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at :27, took 0.436388 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-creates-an-empty-folder-in-HDFS-tp24906.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Exception during SaveAstextFile Stage
Hi, I have 2 stages in my job map and save as text file. During the save text file stage I am getting an exception : 15/09/24 15:38:16 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) It might be too early to ask this since I haven't digged at all into why it is coming, any one has any idea about this? Thanks in advance, Chirag
RE: spark 1.4.1 saveAsTextFile (and Parquet) is slow on emr-4.0.0
For those who have similar issues on EMR writing Parquet files, if you update mapred-site.xml with the following lines: mapred.output.direct.EmrFileSystemtrue mapred.output.direct.NativeS3FileSystemtrue parquet.enable.summary-metadatafalse spark.sql.parquet.output.committer.classorg.apache.spark.sql.parquet.DirectParquetOutputCommitter Then you get Parquet files writing direct to S3 without use of temporary files too, and the disabled summary-metadata files which can cause a performance hit with writing large Parquet datasets on S3 The easiest way to add them across the cluster is via the –configurations flag on the “aws emr create-cluster” command Thanks, Ewan From: Alexander Pivovarov [mailto:apivova...@gmail.com] Sent: 03 September 2015 00:12 To: Neil Jonkers <neilod...@gmail.com> Cc: user@spark.apache.org Subject: Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0 Hi Neil Yes! it helps!!! I do not see _temporary in console output anymore. saveAsTextFile is fast now. 2015-09-02 23:07:00,022 INFO [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 18.0 in stage 0.0 (TID 18) in 4398 ms on ip-10-0-24-103.ec2.internal (1/24) 2015-09-02 23:07:01,887 INFO [task-result-getter-2] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 5.0 in stage 0.0 (TID 5) in 6282 ms on ip-10-0-26-14.ec2.internal (24/24) 2015-09-02 23:07:01,888 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 0 (saveAsTextFile at :22) finished in 6.319 s 2015-09-02 23:07:02,123 INFO [main] s3n.Jets3tNativeFileSystemStore (Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar tmp/test40_141_24_406/_SUCCESS 0 Thank you! On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers <neilod...@gmail.com<mailto:neilod...@gmail.com>> wrote: Hi, Can you set the following parameters in your mapred-site.xml file please: mapred.output.direct.EmrFileSystemtrue mapred.output.direct.NativeS3FileSystemtrue You can also config this at cluster launch time with the following Classification via EMR console: classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true] Thank you On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov <apivova...@gmail.com<mailto:apivova...@gmail.com>> wrote: I checked previous emr config (emr-3.8) mapred-site.xml has the following setting mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov <apivova...@gmail.com<mailto:apivova...@gmail.com>> wrote: Should I use DirectOutputCommitter? spark.hadoop.mapred.output.committer.class com.appsflyer.spark.DirectOutputCommitter On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <apivova...@gmail.com<mailto:apivova...@gmail.com>> wrote: I run spark 1.4.1 in amazom aws emr 4.0.0 For some reason spark saveAsTextFile is very slow on emr 4.0.0 in comparison to emr 3.8 (was 5 sec, now 95 sec) Actually saveAsTextFile says that it's done in 4.356 sec but after that I see lots of INFO messages with 404 error from com.amazonaws.latency logger for next 90 sec spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20") 2015-09-01 21:16:17,637 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5 (saveAsTextFile at :22) finished in 4.356 s 2015-09-01 21:16:17,637 INFO [task-result-getter-2] cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all completed, from pool 2015-09-01 21:16:17,637 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at :22, took 4.547829 s 2015-09-01 21:16:17,638 INFO [main] s3n.S3NativeFileSystem (S3NativeFileSystem.java:listStatus(896)) - listStatus s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false 2015-09-01 21:16:17,651 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 3B2F06FD11682D22), S3 Extended Request ID: C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[https://foo-bar.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129], 2015-09-01 21:16:17,723 INFO [main] amazonaws.latency (AWSRequestMe
Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0
Hi, Can you set the following parameters in your mapred-site.xml file please: mapred.output.direct.EmrFileSystemtrue mapred.output.direct.NativeS3FileSystemtrue You can also config this at cluster launch time with the following Classification via EMR console: classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true] Thank you On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov <apivova...@gmail.com> wrote: > I checked previous emr config (emr-3.8) > mapred-site.xml has the following setting > > mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter > > > > On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov <apivova...@gmail.com> > wrote: > >> Should I use DirectOutputCommitter? >> spark.hadoop.mapred.output.committer.class >> com.appsflyer.spark.DirectOutputCommitter >> >> >> >> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <apivova...@gmail.com >> > wrote: >> >>> I run spark 1.4.1 in amazom aws emr 4.0.0 >>> >>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in >>> comparison to emr 3.8 (was 5 sec, now 95 sec) >>> >>> Actually saveAsTextFile says that it's done in 4.356 sec but after that >>> I see lots of INFO messages with 404 error from com.amazonaws.latency >>> logger for next 90 sec >>> >>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + >>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20") >>> >>> 2015-09-01 21:16:17,637 INFO [dag-scheduler-event-loop] >>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5 >>> (saveAsTextFile at :22) finished in 4.356 s >>> 2015-09-01 21:16:17,637 INFO [task-result-getter-2] >>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, >>> whose tasks have all completed, from pool >>> 2015-09-01 21:16:17,637 INFO [main] scheduler.DAGScheduler >>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at >>> :22, took 4.547829 s >>> 2015-09-01 21:16:17,638 INFO [main] s3n.S3NativeFileSystem >>> (S3NativeFileSystem.java:listStatus(896)) - listStatus >>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false >>> 2015-09-01 21:16:17,651 INFO [main] amazonaws.latency >>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], >>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found >>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request >>> ID: 3B2F06FD11682D22), S3 Extended Request ID: >>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], >>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], >>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[ >>> https://foo-bar.s3.amazonaws.com], Exception=1, >>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, >>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], >>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], >>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129], >>> 2015-09-01 21:16:17,723 INFO [main] amazonaws.latency >>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], >>> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[ >>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0, >>> RequestCount=1, HttpClientPoolPendingCount=0, >>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927], >>> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81], >>> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97], >>> HttpClientSendRequestTime=[0.089], >>> 2015-09-01 21:16:17,756 INFO [main] amazonaws.latency >>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], >>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found >>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request >>> ID: 62C6B413965447FD), S3 Extended Request ID: >>> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf], >>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], >>> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[ >>> https://foo-bar.s3.amazonaws.com], Exception=1, >>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, >>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044], >>> HttpRequestTime=[10.543],
Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0
Hi Neil Yes! it helps!!! I do not see _temporary in console output anymore. saveAsTextFile is fast now. 2015-09-02 23:07:00,022 INFO [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 18.0 in stage 0.0 (TID 18) in 4398 ms on ip-10-0-24-103.ec2.internal (1/24) 2015-09-02 23:07:01,887 INFO [task-result-getter-2] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 5.0 in stage 0.0 (TID 5) in 6282 ms on ip-10-0-26-14.ec2.internal (24/24) 2015-09-02 23:07:01,888 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 0 (saveAsTextFile at :22) finished in 6.319 s 2015-09-02 23:07:02,123 INFO [main] s3n.Jets3tNativeFileSystemStore (Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar tmp/test40_141_24_406/_SUCCESS 0 Thank you! On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers <neilod...@gmail.com> wrote: > Hi, > > Can you set the following parameters in your mapred-site.xml file please: > > > mapred.output.direct.EmrFileSystemtrue > > mapred.output.direct.NativeS3FileSystemtrue > > You can also config this at cluster launch time with the following > Classification via EMR console: > > > classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true] > > > Thank you > > On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov <apivova...@gmail.com> > wrote: > >> I checked previous emr config (emr-3.8) >> mapred-site.xml has the following setting >> >> mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter >> >> >> >> On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov <apivova...@gmail.com >> > wrote: >> >>> Should I use DirectOutputCommitter? >>> spark.hadoop.mapred.output.committer.class >>> com.appsflyer.spark.DirectOutputCommitter >>> >>> >>> >>> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov < >>> apivova...@gmail.com> wrote: >>> >>>> I run spark 1.4.1 in amazom aws emr 4.0.0 >>>> >>>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in >>>> comparison to emr 3.8 (was 5 sec, now 95 sec) >>>> >>>> Actually saveAsTextFile says that it's done in 4.356 sec but after that >>>> I see lots of INFO messages with 404 error from com.amazonaws.latency >>>> logger for next 90 sec >>>> >>>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + >>>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20") >>>> >>>> 2015-09-01 21:16:17,637 INFO [dag-scheduler-event-loop] >>>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5 >>>> (saveAsTextFile at :22) finished in 4.356 s >>>> 2015-09-01 21:16:17,637 INFO [task-result-getter-2] >>>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, >>>> whose tasks have all completed, from pool >>>> 2015-09-01 21:16:17,637 INFO [main] scheduler.DAGScheduler >>>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at >>>> :22, took 4.547829 s >>>> 2015-09-01 21:16:17,638 INFO [main] s3n.S3NativeFileSystem >>>> (S3NativeFileSystem.java:listStatus(896)) - listStatus >>>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false >>>> 2015-09-01 21:16:17,651 INFO [main] amazonaws.latency >>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], >>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found >>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request >>>> ID: 3B2F06FD11682D22), S3 Extended Request ID: >>>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], >>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], >>>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[ >>>> https://foo-bar.s3.amazonaws.com], Exception=1, >>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, >>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], >>>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], >>>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129], >>>> 2015-09-01 21:16:17,723 INFO [main] amazonaws.latency >>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], >>>> ServiceName=[Amazon S3], AWS
Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0
Should I use DirectOutputCommitter? spark.hadoop.mapred.output.committer.class com.appsflyer.spark.DirectOutputCommitter On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <apivova...@gmail.com> wrote: > I run spark 1.4.1 in amazom aws emr 4.0.0 > > For some reason spark saveAsTextFile is very slow on emr 4.0.0 in > comparison to emr 3.8 (was 5 sec, now 95 sec) > > Actually saveAsTextFile says that it's done in 4.356 sec but after that I > see lots of INFO messages with 404 error from com.amazonaws.latency logger > for next 90 sec > > spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + > "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20") > > 2015-09-01 21:16:17,637 INFO [dag-scheduler-event-loop] > scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5 > (saveAsTextFile at :22) finished in 4.356 s > 2015-09-01 21:16:17,637 INFO [task-result-getter-2] cluster.YarnScheduler > (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all > completed, from pool > 2015-09-01 21:16:17,637 INFO [main] scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at > :22, took 4.547829 s > 2015-09-01 21:16:17,638 INFO [main] s3n.S3NativeFileSystem > (S3NativeFileSystem.java:listStatus(896)) - listStatus > s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false > 2015-09-01 21:16:17,651 INFO [main] amazonaws.latency > (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], > Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found > (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request > ID: 3B2F06FD11682D22), S3 Extended Request ID: > C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], > ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], > AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[ > https://foo-bar.s3.amazonaws.com], Exception=1, > HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, > HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], > HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], > RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129], > 2015-09-01 21:16:17,723 INFO [main] amazonaws.latency > (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], > ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[ > https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0, > RequestCount=1, HttpClientPoolPendingCount=0, > HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927], > HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81], > RequestSigningTime=[0.209], ResponseProcessingTime=[17.97], > HttpClientSendRequestTime=[0.089], > 2015-09-01 21:16:17,756 INFO [main] amazonaws.latency > (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], > Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found > (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request > ID: 62C6B413965447FD), S3 Extended Request ID: > 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf], > ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], > AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[ > https://foo-bar.s3.amazonaws.com], Exception=1, > HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, > HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044], > HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743], > RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138], > 2015-09-01 21:16:17,774 INFO [main] amazonaws.latency > (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], > ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[ > https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0, > RequestCount=1, HttpClientPoolPendingCount=0, > HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724], > HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728], > RequestSigningTime=[0.148], ResponseProcessingTime=[0.155], > HttpClientSendRequestTime=[0.068], > 2015-09-01 21:16:17,786 INFO [main] amazonaws.latency > (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], > Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found > (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request > ID: 4846575A1C373BB9), S3 Extended Request ID: > aw/MMKxKPmuDuxTj4GKyDbp8hgpQbTjipJBzdjdTgbwPgt5NsZS4z+tRf2bk3I2E], > ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], > AWSRequestID=[4846575A1C373BB9], ServiceEndpoint=[ > https://foo-bar.s3.amazonaws.com], Exception=1, > HttpClientPoolLeasedCount=0, RequestCount=1, HttpClie
spark 1.4.1 saveAsTextFile is slow on emr-4.0.0
I run spark 1.4.1 in amazom aws emr 4.0.0 For some reason spark saveAsTextFile is very slow on emr 4.0.0 in comparison to emr 3.8 (was 5 sec, now 95 sec) Actually saveAsTextFile says that it's done in 4.356 sec but after that I see lots of INFO messages with 404 error from com.amazonaws.latency logger for next 90 sec spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20") 2015-09-01 21:16:17,637 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5 (saveAsTextFile at :22) finished in 4.356 s 2015-09-01 21:16:17,637 INFO [task-result-getter-2] cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all completed, from pool 2015-09-01 21:16:17,637 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at :22, took 4.547829 s 2015-09-01 21:16:17,638 INFO [main] s3n.S3NativeFileSystem (S3NativeFileSystem.java:listStatus(896)) - listStatus s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false 2015-09-01 21:16:17,651 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 3B2F06FD11682D22), S3 Extended Request ID: C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[ https://foo-bar.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129], 2015-09-01 21:16:17,723 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[ https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927], HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81], RequestSigningTime=[0.209], ResponseProcessingTime=[17.97], HttpClientSendRequestTime=[0.089], 2015-09-01 21:16:17,756 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 62C6B413965447FD), S3 Extended Request ID: 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[ https://foo-bar.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044], HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743], RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138], 2015-09-01 21:16:17,774 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[ https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724], HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728], RequestSigningTime=[0.148], ResponseProcessingTime=[0.155], HttpClientSendRequestTime=[0.068], 2015-09-01 21:16:17,786 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 4846575A1C373BB9), S3 Extended Request ID: aw/MMKxKPmuDuxTj4GKyDbp8hgpQbTjipJBzdjdTgbwPgt5NsZS4z+tRf2bk3I2E], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[4846575A1C373BB9], ServiceEndpoint=[ https://foo-bar.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.531], HttpRequestTime=[11.134], HttpClientReceiveResponseTime=[9.434], RequestSigningTime=[0.206], HttpClientSendRequestTime=[0.13], 2015-09-01 21:16:17,786 INFO [main] s3n.S3NativeFileSystem (S3NativeFileSystem.java:listStatus(896)) - listStatus s3n://foo-bar/tmp/test40_20/_temporary/0/task_201509012116_0005_m_00 with recursive false 2015-09-01 21:16:17,798 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service
Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0
I checked previous emr config (emr-3.8) mapred-site.xml has the following setting mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov <apivova...@gmail.com> wrote: > Should I use DirectOutputCommitter? > spark.hadoop.mapred.output.committer.class > com.appsflyer.spark.DirectOutputCommitter > > > > On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <apivova...@gmail.com> > wrote: > >> I run spark 1.4.1 in amazom aws emr 4.0.0 >> >> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in >> comparison to emr 3.8 (was 5 sec, now 95 sec) >> >> Actually saveAsTextFile says that it's done in 4.356 sec but after that I >> see lots of INFO messages with 404 error from com.amazonaws.latency logger >> for next 90 sec >> >> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + >> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20") >> >> 2015-09-01 21:16:17,637 INFO [dag-scheduler-event-loop] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5 >> (saveAsTextFile at :22) finished in 4.356 s >> 2015-09-01 21:16:17,637 INFO [task-result-getter-2] >> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, >> whose tasks have all completed, from pool >> 2015-09-01 21:16:17,637 INFO [main] scheduler.DAGScheduler >> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at >> :22, took 4.547829 s >> 2015-09-01 21:16:17,638 INFO [main] s3n.S3NativeFileSystem >> (S3NativeFileSystem.java:listStatus(896)) - listStatus >> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false >> 2015-09-01 21:16:17,651 INFO [main] amazonaws.latency >> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], >> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found >> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request >> ID: 3B2F06FD11682D22), S3 Extended Request ID: >> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], >> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], >> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[ >> https://foo-bar.s3.amazonaws.com], Exception=1, >> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, >> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], >> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], >> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129], >> 2015-09-01 21:16:17,723 INFO [main] amazonaws.latency >> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], >> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[ >> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0, >> RequestCount=1, HttpClientPoolPendingCount=0, >> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927], >> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81], >> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97], >> HttpClientSendRequestTime=[0.089], >> 2015-09-01 21:16:17,756 INFO [main] amazonaws.latency >> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], >> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found >> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request >> ID: 62C6B413965447FD), S3 Extended Request ID: >> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf], >> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], >> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[ >> https://foo-bar.s3.amazonaws.com], Exception=1, >> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, >> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044], >> HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743], >> RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138], >> 2015-09-01 21:16:17,774 INFO [main] amazonaws.latency >> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], >> ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[ >> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0, >> RequestCount=1, HttpClientPoolPendingCount=0, >> HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724], >> HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728], >> RequestSigningTime=[0.148], ResponseProcessingTime=[0.155], >> HttpClientSendRequestTime=[0.068], >> 2015-09-01 21:16:17,786 INFO [main] amazonaws.latency >> (AW
Re: EC2 cluster doesn't work saveAsTextFile
Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç
Re: EC2 cluster doesn't work saveAsTextFile
Thanx Dean, i am giving unique output path and in every time i also delete the directory before i run the job. 2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com: Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç -- hiç ender hiç
Re: EC2 cluster doesn't work saveAsTextFile
So, just before running the job, if you run the HDFS command at a shell prompt: hdfs dfs -ls hdfs://172.31.42.10:54310/./weblogReadResult. Does it say the path doesn't exist? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:58 AM, Yasemin Kaya godo...@gmail.com wrote: Thanx Dean, i am giving unique output path and in every time i also delete the directory before i run the job. 2015-08-10 15:30 GMT+03:00 Dean Wampler deanwamp...@gmail.com: Following Hadoop conventions, Spark won't overwrite an existing directory. You need to provide a unique output path every time you run the program, or delete or rename the target directory before you run the job. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç -- hiç ender hiç
EC2 cluster doesn't work saveAsTextFile
Hi, I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit at local there is no problem , but i run at cluster, saveAsTextFile doesn't work.*It says me User class threw exception: Output directory hdfs://172.31.42.10:54310/./weblogReadResult http://172.31.42.10:54310/./weblogReadResult already exists* Is there anyone can help me about this issue ? Best, yasemin -- hiç ender hiç
Re: Combining Spark Files with saveAsTextFile
Hi,Try using coalesce(1) before calling saveAsTextFile() Thanks Regards, Meethu M On Wednesday, 5 August 2015 7:53 AM, Brandon White bwwintheho...@gmail.com wrote: What is the best way to make saveAsTextFile save as only a single file?
Re: Combining Spark Files with saveAsTextFile
using coalesce might be dangerous, since 1 worker process will need to handle whole file and if the file is huge you'll get OOM, however it depends on implementation, I'm not sure how it will be done nevertheless, worse to try the coallesce method(please post your results) another option would be to use FileUtil.copyMerge which copies each partition one after another into destination stream(file); so as soon as you've written your hdfs file with spark with multiple partitions in parallel(as usual), you can then make another step to merge it into any destination you want On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote: Just to further clarify, you can first call coalesce with argument 1 and then call saveAsTextFile. For example, rdd.coalesce(1).saveAsTextFile(...) Mohammed *From:* Mohammed Guller *Sent:* Tuesday, August 4, 2015 9:39 PM *To:* 'Brandon White'; user *Subject:* RE: Combining Spark Files with saveAsTextFile One options is to use the coalesce method in the RDD class. Mohammed *From:* Brandon White [mailto:bwwintheho...@gmail.com bwwintheho...@gmail.com] *Sent:* Tuesday, August 4, 2015 7:23 PM *To:* user *Subject:* Combining Spark Files with saveAsTextFile What is the best way to make saveAsTextFile save as only a single file?
Re: Combining Spark Files with saveAsTextFile
seems that coallesce do work, see following thread https://www.mail-archive.com/user%40spark.apache.org/msg00928.html On 5 August 2015 at 09:47, Igor Berman igor.ber...@gmail.com wrote: using coalesce might be dangerous, since 1 worker process will need to handle whole file and if the file is huge you'll get OOM, however it depends on implementation, I'm not sure how it will be done nevertheless, worse to try the coallesce method(please post your results) another option would be to use FileUtil.copyMerge which copies each partition one after another into destination stream(file); so as soon as you've written your hdfs file with spark with multiple partitions in parallel(as usual), you can then make another step to merge it into any destination you want On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote: Just to further clarify, you can first call coalesce with argument 1 and then call saveAsTextFile. For example, rdd.coalesce(1).saveAsTextFile(...) Mohammed *From:* Mohammed Guller *Sent:* Tuesday, August 4, 2015 9:39 PM *To:* 'Brandon White'; user *Subject:* RE: Combining Spark Files with saveAsTextFile One options is to use the coalesce method in the RDD class. Mohammed *From:* Brandon White [mailto:bwwintheho...@gmail.com bwwintheho...@gmail.com] *Sent:* Tuesday, August 4, 2015 7:23 PM *To:* user *Subject:* Combining Spark Files with saveAsTextFile What is the best way to make saveAsTextFile save as only a single file?
RE: Combining Spark Files with saveAsTextFile
One options is to use the coalesce method in the RDD class. Mohammed From: Brandon White [mailto:bwwintheho...@gmail.com] Sent: Tuesday, August 4, 2015 7:23 PM To: user Subject: Combining Spark Files with saveAsTextFile What is the best way to make saveAsTextFile save as only a single file?
RE: Combining Spark Files with saveAsTextFile
Just to further clarify, you can first call coalesce with argument 1 and then call saveAsTextFile. For example, rdd.coalesce(1).saveAsTextFile(...) Mohammed From: Mohammed Guller Sent: Tuesday, August 4, 2015 9:39 PM To: 'Brandon White'; user Subject: RE: Combining Spark Files with saveAsTextFile One options is to use the coalesce method in the RDD class. Mohammed From: Brandon White [mailto:bwwintheho...@gmail.com] Sent: Tuesday, August 4, 2015 7:23 PM To: user Subject: Combining Spark Files with saveAsTextFile What is the best way to make saveAsTextFile save as only a single file?
Combining Spark Files with saveAsTextFile
What is the best way to make saveAsTextFile save as only a single file?
spark cache issue while doing saveAsTextFile and saveAsParquetFile
Hi There, I am using cache mapPartition to do some processing and cache the result as below I am storing the file as both format (parquet and textfile) where recomputing is happening both time.Eventhough i put the cache its not working as expected. below is the code snippet.Any help is really appreciated. val record = sql(sqlString) val outputRecords=record.repartition(1).mapPartitions{rows = val finalList1 = ListBuffer[Row]() while (rows.hasNext){ . . finalList1.add(xyz) } finalList1.iterator }.cache() val l = applySchema(outputRecords, schemaName).cache() l.saveAsTextFile(filename + .txt) l.saveAsParquetFile(filename+ .parquet) Expected result: When we do saveAsTextFile the computation should happen and cache the result and the second time when we do saveAsparquetFile it should get the result from the cache. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-cache-issue-while-doing-saveAsTextFile-and-saveAsParquetFile-tp23845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD saveAsTextFile() to local disk
Getting exception when wrting RDD to local disk using following function saveAsTextFile(file:home/someuser/dir2/testupload/20150708/) The dir (/home/someuser/dir2/testupload/) was created before running the job. The error message is misleading. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException: Parent path is not a directory: file:/home/someuser/dir2 at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- -Vijay
Re: RDD saveAsTextFile() to local disk
It works for me by using the following code. Could you share your code ? *val data =sc.parallelize(List(1,2,3))* *data.saveAsTextFile(file:Users/chen/Temp/c)* On Thu, Jul 9, 2015 at 4:05 AM, spok20nn vijaypawnar...@gmail.com wrote: Getting exception when wrting RDD to local disk using following function saveAsTextFile(file:home/someuser/dir2/testupload/20150708/) The dir (/home/someuser/dir2/testupload/) was created before running the job. The error message is misleading. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException: Parent path is not a directory: file:/home/someuser/dir2 at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426) at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-saveAsTextFile-to-local-disk-tp23725.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark dramatically slow when I add saveAsTextFile
*Problem Description*: The program running in stand-alone spark cluster (1 master, 6 workers with 8g ram and 2 cores). Input: a 468MB file with 133433 records stored in HDFS. Output: just 2MB file will stored in HDFS The program has two map operations and one reduceByKey operation. Finally I save the result to HDFS using *saveAsTextFile*. *Problem*: if I don't add saveAsTextFile, the program runs very fast(a few seconds), otherwise extremely slow until about 30 mins. *My program (is very Simple)* public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; String outputPath = hdfs://HadoopV26Master:9000/user/sparkoutput/; final int row = 133433; final int col = 458; final double dc = Double.valueOf(args[0]); SparkConf conf = new SparkConf(). setAppName(distance) .set(spark.executor.memory, 4g).set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.eventLog.enabled, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable, the dimension of this double array: 133433*458 final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDInteger,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Integer, Double(){ public IterableTuple2lt;Integer, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.value(); double sum = 0; Listlt;Tuple2lt;Integer, Double list = new ArrayListTuple2lt;Integer, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) )); } return list; } }); //Create zeroOne density JavaPairRDDInteger, Integer densityZeroOne = distance.mapValues(new FunctionDouble, Integer(){ public Integer call(Double v1) throws Exception { if(v1dc) return 1; else return 0; } }); // //Combine the density JavaPairRDDlt;Integer, Integer counts = densityZeroOne.reduceByKey(new Function2Integer, Integer,Integer() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); counts.*saveAsTextFile*(outputPath+args[1]); sc.stop(); } *If I comment saveAsTextFile, log will be:* Picked up _JAVA_OPTIONS: -Xmx4g 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser 15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to: hduser 15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser); users with modify permissions: Set(hduser) 15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/05/24 15:21:31 INFO Remoting
Re: Spark dramatically slow when I add saveAsTextFile
This may sound like an obvious question, but are you sure that the program is doing any work when you don't have a saveAsTextFile? If there are transformations but no actions to actually collect the data, there's no need for Spark to execute the transformations. As to the question of 'is this taking too long', I can't answer that. But your code was HTML escaped and therefore difficult to read, perhaps you should post a link to a Gist. Joe On 24 May 2015 at 10:36, allanjie allanmcgr...@gmail.com wrote: *Problem Description*: The program running in stand-alone spark cluster (1 master, 6 workers with 8g ram and 2 cores). Input: a 468MB file with 133433 records stored in HDFS. Output: just 2MB file will stored in HDFS The program has two map operations and one reduceByKey operation. Finally I save the result to HDFS using *saveAsTextFile*. *Problem*: if I don't add saveAsTextFile, the program runs very fast(a few seconds), otherwise extremely slow until about 30 mins. *My program (is very Simple)* public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; String outputPath = hdfs://HadoopV26Master:9000/user/sparkoutput/; final int row = 133433; final int col = 458; final double dc = Double.valueOf(args[0]); SparkConf conf = new SparkConf(). setAppName(distance) .set(spark.executor.memory, 4g).set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.eventLog.enabled, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable, the dimension of this double array: 133433*458 final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDInteger,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Integer, Double(){ public IterableTuple2lt;Integer, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.value(); double sum = 0; Listlt;Tuple2lt;Integer, Double list = new ArrayListTuple2lt;Integer, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) )); } return list; } }); //Create zeroOne density JavaPairRDDInteger, Integer densityZeroOne = distance.mapValues(new FunctionDouble, Integer(){ public Integer call(Double v1) throws Exception { if(v1dc) return 1; else return 0; } }); // //Combine the density JavaPairRDDlt;Integer, Integer counts = densityZeroOne.reduceByKey(new Function2Integer, Integer,Integer() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); counts.*saveAsTextFile*(outputPath+args[1]); sc.stop(); } *If I comment saveAsTextFile, log will be:* Picked up _JAVA_OPTIONS: -Xmx4g 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load
Re: FP Growth saveAsTextFile
+user If this was in cluster mode, you should provide a path on a shared file system, e.g., HDFS, instead of a local path. If this is in local model, I'm not sure what went wrong. On Wed, May 20, 2015 at 2:09 PM, Eric Tanner eric.tan...@justenough.com wrote: Here is the stack trace. Thanks for looking at this. scala model.freqItemsets.saveAsTextFile(c:///repository/trunk/Scala_210_wspace/fpGrowth/modelText1) 15/05/20 14:07:47 INFO SparkContext: Starting job: saveAsTextFile at console:33 15/05/20 14:07:47 INFO DAGScheduler: Got job 15 (saveAsTextFile at console:33) with 2 output partitions (allowLocal=false) 15/05/20 14:07:47 INFO DAGScheduler: Final stage: Stage 30(saveAsTextFile at console:33) 15/05/20 14:07:47 INFO DAGScheduler: Parents of final stage: List(Stage 29) 15/05/20 14:07:47 INFO DAGScheduler: Missing parents: List() 15/05/20 14:07:47 INFO DAGScheduler: Submitting Stage 30 (MapPartitionsRDD[21] at saveAsTextFile at console:33), which has no missing parents 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(131288) called with curMem=724428, maxMem=278302556 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 128.2 KB, free 264.6 MB) 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(78995) called with curMem=855716, maxMem=278302556 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 77.1 KB, free 264.5 MB) 15/05/20 14:07:47 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on localhost:52396 (size: 77.1 KB, free: 265.1 MB) 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0 15/05/20 14:07:47 INFO SparkContext: Created broadcast 18 from broadcast at DAGScheduler.scala:839 15/05/20 14:07:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage 30 (MapPartitionsRDD[21] at saveAsTextFile at console:33) 15/05/20 14:07:47 INFO TaskSchedulerImpl: Adding task set 30.0 with 2 tasks 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 17 15/05/20 14:07:47 INFO TaskSetManager: Starting task 0.0 in stage 30.0 (TID 33, localhost, PROCESS_LOCAL, 1056 bytes) 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17_piece0 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17_piece0 of size 4737 dropped from memory (free 277372582) 15/05/20 14:07:47 INFO TaskSetManager: Starting task 1.0 in stage 30.0 (TID 34, localhost, PROCESS_LOCAL, 1056 bytes) 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_17_piece0 on localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB) 15/05/20 14:07:47 INFO Executor: Running task 1.0 in stage 30.0 (TID 34) 15/05/20 14:07:47 INFO Executor: Running task 0.0 in stage 30.0 (TID 33) 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block broadcast_17_piece0 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17 of size 6696 dropped from memory (free 277379278) 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 17 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 16 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16_piece0 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16_piece0 of size 4737 dropped from memory (free 277384015) 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_16_piece0 on localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB) 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block broadcast_16_piece0 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16 of size 6696 dropped from memory (free 277390711) 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 16 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/05/20 14:07:47 ERROR Executor: Exception in task 1.0 in stage 30.0 (TID 34) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.util.Shell.execCommand(Shell.java:808) at org.apache.hadoop.util.Shell.execCommand(Shell.java:791) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:656) at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:490) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462
saveAsTextFile() part- files are missing
Hello! I just started with Spark. I have an application which counts words in a file (1 MB file). The file is stored locally. I loaded the file using native code and then created the RDD from it. JavaRDDString rddFromFile = context.parallelize(myFile, 2); JavaRDDString words = rddFromFile.flatMap(...); JavaPairRDDString, Integer pairs = words.mapToPair(...); JavaPairRDDString, Integer counter = pairs.reduceByKey(..); counter.saveAsTextFile(file:///root/output); context.close(); I have one master and 2 slaves. I run the program from the master node. The output directory is created on the master node and on the 2 nodes. On the master node I have only one file _SUCCES (empty) and on the nodes I have _temporary file. I printed the counter at the console, the result seems ok. What am I doing wrong? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile() part- files are missing
Hi, it looks you are writing to a local filesystem. Could you try writing to a location visible by all nodes (master and workers), e.g. nfs share? HTH, Tomasz W dniu 21.05.2015 o 17:16, rroxanaioana pisze: Hello! I just started with Spark. I have an application which counts words in a file (1 MB file). The file is stored locally. I loaded the file using native code and then created the RDD from it. JavaRDDString rddFromFile = context.parallelize(myFile, 2); JavaRDDString words = rddFromFile.flatMap(...); JavaPairRDDString, Integer pairs = words.mapToPair(...); JavaPairRDDString, Integer counter = pairs.reduceByKey(..); counter.saveAsTextFile(file:///root/output); context.close(); I have one master and 2 slaves. I run the program from the master node. The output directory is created on the master node and on the 2 nodes. On the master node I have only one file _SUCCES (empty) and on the nodes I have _temporary file. I printed the counter at the console, the result seems ok. What am I doing wrong? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FP Growth saveAsTextFile
I am having trouble with saving an FP-Growth model as a text file. I can print out the results, but when I try to save the model I get a NullPointerException. model.freqItemsets.saveAsTextFile(c://fpGrowth/model) Thanks, Eric
Re: FP Growth saveAsTextFile
Could you post the stack trace? If you are using Spark 1.3 or 1.4, it would be easier to save freq itemsets as a Parquet file. -Xiangrui On Wed, May 20, 2015 at 12:16 PM, Eric Tanner eric.tan...@justenough.com wrote: I am having trouble with saving an FP-Growth model as a text file. I can print out the results, but when I try to save the model I get a NullPointerException. model.freqItemsets.saveAsTextFile(c://fpGrowth/model) Thanks, Eric - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SaveAsTextFile brings down data nodes with IO Exceptions
All - this issue showed up when I was tearing down a spark context and creating a new one. Often, I was unable to then write to HDFS due to this error. I subsequently switched to a different implementation where instead of tearing down and re initializing the spark context I'd instead submit a separate request to YARN. On Fri, May 15, 2015 at 2:35 PM Puneet Kapoor puneet.cse.i...@gmail.com wrote: I am seeing this on hadoop 2.4.0 version. Thanks for your suggestions, i will try those and let you know if they help ! On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com wrote: What version of Hadoop are you seeing this on? On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com wrote: Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) That's being logged @ error level in DN. It doesn't mean the DN has crashed, only that it timed out waiting for data: something has gone wrong elsewhere. https://issues.apache.org/jira/browse/HDFS-693 there's a couple of properties you can do to extend timeouts property namedfs.socket.timeout/name value2/value /property property namedfs.datanode.socket.write.timeout/name value2/value /property You can also increase the number of data node tranceiver threads to handle data IO across the network property namedfs.datanode.max.xcievers/name value4096/value /property Yes, that property has that explicit spellinng, it's easy to get wrong
Re: SaveAsTextFile brings down data nodes with IO Exceptions
Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) Thanks Puneet On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all, as the last stage of execution, I am writing out a dataset to disk. Before I do this, I force the DAG to resolve so this is the only job left in the pipeline. The dataset in question is not especially large (a few gigabytes). During this step however, HDFS will inevitable crash. I will lose connection to data-nodes and get stuck in the loop of death – failure causes job restart, eventually causing the overall job to fail. On the data node logs I see the errors below. Does anyone have any ideas as to what is going on here? Thanks! java.io.IOException: Premature EOF from inputStream at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) at java.lang.Thread.run(Thread.java:745) innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing WRITE_BLOCK operation src: /10.37.248.60:44676 dst: /10.37.248.59:1004 java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 remote=/10.37.248.63:1004] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at
Re: SaveAsTextFile brings down data nodes with IO Exceptions
I am seeing this on hadoop 2.4.0 version. Thanks for your suggestions, i will try those and let you know if they help ! On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com wrote: What version of Hadoop are you seeing this on? On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com wrote: Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) That's being logged @ error level in DN. It doesn't mean the DN has crashed, only that it timed out waiting for data: something has gone wrong elsewhere. https://issues.apache.org/jira/browse/HDFS-693 there's a couple of properties you can do to extend timeouts property namedfs.socket.timeout/name value2/value /property property namedfs.datanode.socket.write.timeout/name value2/value /property You can also increase the number of data node tranceiver threads to handle data IO across the network property namedfs.datanode.max.xcievers/name value4096/value /property Yes, that property has that explicit spellinng, it's easy to get wrong
Re: saveAsTextFile() to save output of Spark program to HDFS
Another thing - could it be a permission problem ? It creates all the directory structure (in red)/tmp/wordcount/ _temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 so I am guessing not. On Tue, May 5, 2015 at 7:27 PM, Sudarshan Murty njmu...@gmail.com wrote: You are most probably right. I assumed others may have run into this. When I try to put the files in there, it creates a directory structure with the part-0 and part1 files but these files are of size 0 - no content. The client error and the server logs have the error message shown - which seem to indicate that the system is aware that a datanode exists but is excluded from the operation. So, it looks like it is not partitioned and Ambari indicates that HDFS is in good health with one NN, one SN, one DN. I am unable to figure out what the issue is. thanks for your help. On Tue, May 5, 2015 at 6:39 PM, ayan guha guha.a...@gmail.com wrote: What happens when you try to put files to your hdfs from local filesystem? Looks like its a hdfs issue rather than spark thing. On 6 May 2015 05:04, Sudarshan njmu...@gmail.com wrote: I have searched all replies to this question not found an answer. I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by side, on the same machine and trying to write output of wordcount program into HDFS (works fine writing to a local file, /tmp/wordcount). Only line I added to the wordcount program is: (where 'counts' is the JavaPairRDD) *counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount http://sandbox.hortonworks.com:8020/tmp/wordcount);* When I check in HDFS at that location (/tmp) here's what I find. /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0 and /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 and *both part-000[01] are 0 size files*. The wordcount client output error is: [Stage 1: (0 + 2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 *could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation.* at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642) I tried this with Spark 1.2.1 same error. I have plenty of space on the DFS. The Name Node, Sec Name Node the one Data Node are all healthy. Any hint as to what may be the problem ? thanks in advance. Sudarshan -- View this message in context: saveAsTextFile() to save output of Spark program to HDFS http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: saveAsTextFile() to save output of Spark program to HDFS
You are most probably right. I assumed others may have run into this. When I try to put the files in there, it creates a directory structure with the part-0 and part1 files but these files are of size 0 - no content. The client error and the server logs have the error message shown - which seem to indicate that the system is aware that a datanode exists but is excluded from the operation. So, it looks like it is not partitioned and Ambari indicates that HDFS is in good health with one NN, one SN, one DN. I am unable to figure out what the issue is. thanks for your help. On Tue, May 5, 2015 at 6:39 PM, ayan guha guha.a...@gmail.com wrote: What happens when you try to put files to your hdfs from local filesystem? Looks like its a hdfs issue rather than spark thing. On 6 May 2015 05:04, Sudarshan njmu...@gmail.com wrote: I have searched all replies to this question not found an answer. I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by side, on the same machine and trying to write output of wordcount program into HDFS (works fine writing to a local file, /tmp/wordcount). Only line I added to the wordcount program is: (where 'counts' is the JavaPairRDD) *counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount http://sandbox.hortonworks.com:8020/tmp/wordcount);* When I check in HDFS at that location (/tmp) here's what I find. /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0 and /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 and *both part-000[01] are 0 size files*. The wordcount client output error is: [Stage 1: (0 + 2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 *could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation.* at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642) I tried this with Spark 1.2.1 same error. I have plenty of space on the DFS. The Name Node, Sec Name Node the one Data Node are all healthy. Any hint as to what may be the problem ? thanks in advance. Sudarshan -- View this message in context: saveAsTextFile() to save output of Spark program to HDFS http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: saveAsTextFile() to save output of Spark program to HDFS
What happens when you try to put files to your hdfs from local filesystem? Looks like its a hdfs issue rather than spark thing. On 6 May 2015 05:04, Sudarshan njmu...@gmail.com wrote: I have searched all replies to this question not found an answer. I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by side, on the same machine and trying to write output of wordcount program into HDFS (works fine writing to a local file, /tmp/wordcount). Only line I added to the wordcount program is: (where 'counts' is the JavaPairRDD) *counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount http://sandbox.hortonworks.com:8020/tmp/wordcount);* When I check in HDFS at that location (/tmp) here's what I find. /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0 and /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 and *both part-000[01] are 0 size files*. The wordcount client output error is: [Stage 1: (0 + 2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 *could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation.* at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642) I tried this with Spark 1.2.1 same error. I have plenty of space on the DFS. The Name Node, Sec Name Node the one Data Node are all healthy. Any hint as to what may be the problem ? thanks in advance. Sudarshan -- View this message in context: saveAsTextFile() to save output of Spark program to HDFS http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
saveAsTextFile() to save output of Spark program to HDFS
I have searched all replies to this question not found an answer.I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by side, on the same machine and trying to write output of wordcount program into HDFS (works fine writing to a local file, /tmp/wordcount).Only line I added to the wordcount program is: (where 'counts' is the JavaPairRDD)*counts.saveAsTextFile(hdfs://sandbox.hortonworks.com:8020/tmp/wordcount);*When I check in HDFS at that location (/tmp) here's what I find./tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0and/tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1and *both part-000[01] are 0 size files*.The wordcount client output error is:[Stage 1: (0 + 2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exceptionorg.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1 *could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation.* at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)I tried this with Spark 1.2.1 same error.I have plenty of space on the DFS.The Name Node, Sec Name Node the one Data Node are all healthy.Any hint as to what may be the problem ?thanks in advance.Sudarshan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Efficient saveAsTextFile by key, directory for each key?
I ended up post-processing the result in hive with a dynamic partition insert query to get a table partitioned by the key. Looking further, it seems that 'dynamic partition' insert is in Spark SQL and working well in Spark SQL versions 1.2.0: https://issues.apache.org/jira/browse/SPARK-3007 On Tue, Apr 21, 2015 at 5:45 PM, Arun Luthra arun.lut...@gmail.com wrote: Is there an efficient way to save an RDD with saveAsTextFile in such a way that the data gets shuffled into separated directories according to a key? (My end goal is to wrap the result in a multi-partitioned Hive table) Suppose you have: case class MyData(val0: Int, val1: string, directory_name: String) and an RDD called myrdd with type RDD[MyData]. Suppose that you already have an array of the distinct directory_name's, called distinct_directories. A very inefficient way to to this is: distinct_directories.foreach( dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name ) .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,)) .coalesce(5) .saveAsTextFile(base_dir_name/ + f$dir_name) ) I tried this solution, and it does not do the multiple myrdd.filter's in parallel. I'm guessing partitionBy might be in the efficient solution if an easy efficient solution exists... Thanks, Arun
Efficient saveAsTextFile by key, directory for each key?
Is there an efficient way to save an RDD with saveAsTextFile in such a way that the data gets shuffled into separated directories according to a key? (My end goal is to wrap the result in a multi-partitioned Hive table) Suppose you have: case class MyData(val0: Int, val1: string, directory_name: String) and an RDD called myrdd with type RDD[MyData]. Suppose that you already have an array of the distinct directory_name's, called distinct_directories. A very inefficient way to to this is: distinct_directories.foreach( dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name ) .map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,)) .coalesce(5) .saveAsTextFile(base_dir_name/ + f$dir_name) ) I tried this solution, and it does not do the multiple myrdd.filter's in parallel. I'm guessing partitionBy might be in the efficient solution if an easy efficient solution exists... Thanks, Arun
Re: Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2
Not sure if this will help, but try clearing your jar cache (for sbt ~/.ivy2 and for maven ~/.m2) directories. Thanks Best Regards On Wed, Apr 15, 2015 at 9:33 PM, Manoj Samel manojsamelt...@gmail.com wrote: Env - Spark 1.3 Hadoop 2.3, Kerbeos xx.saveAsTextFile(path, codec) gives following trace. Same works with Spark 1.2 in same environment val codec = classOf[some codec class] val a = sc.textFile(/some_hdfs_file) a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace in Spark 1.3, works in Spark 1.2 in same env 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot authenticate the provider BC) [duplicate 7] 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate the provider BC at javax.crypto.Cipher.getInstance(Cipher.java:642) at javax.crypto.Cipher.getInstance(Cipher.java:580) some codec calls at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.jar.JarException: file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462) at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322) at javax.crypto.JarVerifier.verify(JarVerifier.java:250) at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161) at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187) at javax.crypto.Cipher.getInstance(Cipher.java:638) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
saveAsTextFile
I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ
Re: saveAsTextFile
Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6 ᐧ http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
RE: saveAsTextFile
Nop Sir, it is possible - check my reply earlier -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 6:35 PM To: Vadim Bichutskiy Cc: user@spark.apache.org Subject: Re: saveAsTextFile You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
Basically you need to unbundle the elements of the RDD and then store them wherever you want - Use foreacPartition and then foreach -Original Message- From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:39 PM To: Sean Owen Cc: user@spark.apache.org Subject: Re: saveAsTextFile Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
Just copy the files? it shouldn't matter that much where they are as you can find them easily. Or consider somehow sending the batches of data straight into Redshift? no idea how that is done but I imagine it's doable. On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
Also to juggle even further the multithreading model of both spark and HDFS you can even publish the data from spark first to a message broker e.g. kafka from where a predetermined number (from 1 to infinity) of parallel consumers will retrieve and store in HDFS in one or more finely controlled files and directories From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:45 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: saveAsTextFile Thanks Evo for your detailed explanation. On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote: The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6 ᐧ http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
Re: saveAsTextFile
Copy should be doable but I'm not sure how to specify a prefix for the directory while keeping the filename (ie part-0) fixed in copy command. On Apr 16, 2015, at 1:51 PM, Sean Owen so...@cloudera.com wrote: Just copy the files? it shouldn't matter that much where they are as you can find them easily. Or consider somehow sending the batches of data straight into Redshift? no idea how that is done but I imagine it's doable. On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
Thanks Evo for your detailed explanation. On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote: The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ
Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2
Env - Spark 1.3 Hadoop 2.3, Kerbeos xx.saveAsTextFile(path, codec) gives following trace. Same works with Spark 1.2 in same environment val codec = classOf[some codec class] val a = sc.textFile(/some_hdfs_file) a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace in Spark 1.3, works in Spark 1.2 in same env 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot authenticate the provider BC) [duplicate 7] 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate the provider BC at javax.crypto.Cipher.getInstance(Cipher.java:642) at javax.crypto.Cipher.getInstance(Cipher.java:580) some codec calls at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.jar.JarException: file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462) at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322) at javax.crypto.JarVerifier.verify(JarVerifier.java:250) at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161) at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187) at javax.crypto.Cipher.getInstance(Cipher.java:638) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Re: Spark permission denied error when invoking saveAsTextFile
Ignore the question. There was a Hadoop setting that needed to be set to get it working. -- Kannan On Wed, Apr 1, 2015 at 1:37 PM, Kannan Rajah kra...@maprtech.com wrote: Running a simple word count job in standalone mode as a non root user from spark-shell. The spark master, worker services are running as root user. The problem is the _temporary under /user/krajah/output2/_temporary/0 dir is being created with root permission even when running the job as non root user - krajah in this case. The higher level directories are getting created with right permission though. There was a similar question posted long time back, but there is no answer: http://mail-archives.apache.org/mod_mbox/mesos-user/201408.mbox/%3CCAAeYHL2M9J9xEotf_0zXmZXy2_x-oBHa=xxl2naft203o6u...@mail.gmail.com%3E *Wrong permission for child directory* drwxr-xr-x - root root0 2015-04-01 11:20 /user/krajah/output2/_temporary/0/_temporary *Right permission for parent directories* hadoop fs -ls -R /user/krajah/my_output drwxr-xr-x - krajah krajah 1 2015-04-01 11:46 /user/krajah/my_output/_temporary drwxr-xr-x - krajah krajah 3 2015-04-01 11:46 /user/krajah/my_output/_temporary/0 *Job and Stacktrace* scala val file = sc.textFile(/user/krajah/junk.txt) scala val counts = file.flatMap(line = line.split( )) scala .map(word = (word, 1)) scala .reduceByKey(_ + _) scala counts.saveAsTextFile(/user/krajah/count2) java.io.IOException: Error: Permission denied at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:926) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1079) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:853) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1199) at $iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC.init(console:24) at $iwC.init(console:26) at init(console:28) at .init(console:32) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) -- Kannan
Spark permission denied error when invoking saveAsTextFile
Running a simple word count job in standalone mode as a non root user from spark-shell. The spark master, worker services are running as root user. The problem is the _temporary under /user/krajah/output2/_temporary/0 dir is being created with root permission even when running the job as non root user - krajah in this case. The higher level directories are getting created with right permission though. There was a similar question posted long time back, but there is no answer: http://mail-archives.apache.org/mod_mbox/mesos-user/201408.mbox/%3CCAAeYHL2M9J9xEotf_0zXmZXy2_x-oBHa=xxl2naft203o6u...@mail.gmail.com%3E *Wrong permission for child directory* drwxr-xr-x - root root0 2015-04-01 11:20 /user/krajah/output2/_temporary/0/_temporary *Right permission for parent directories* hadoop fs -ls -R /user/krajah/my_output drwxr-xr-x - krajah krajah 1 2015-04-01 11:46 /user/krajah/my_output/_temporary drwxr-xr-x - krajah krajah 3 2015-04-01 11:46 /user/krajah/my_output/_temporary/0 *Job and Stacktrace* scala val file = sc.textFile(/user/krajah/junk.txt) scala val counts = file.flatMap(line = line.split( )) scala .map(word = (word, 1)) scala .reduceByKey(_ + _) scala counts.saveAsTextFile(/user/krajah/count2) java.io.IOException: Error: Permission denied at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:926) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1079) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:853) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1199) at $iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC.init(console:24) at $iwC.init(console:26) at init(console:28) at .init(console:32) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) -- Kannan
Pyspark saveAsTextFile exceptions
Hi Team, I'm getting below exception for saving the results into hadoop. *Code :* rdd.saveAsTextFile(hdfs://localhost:9000/home/rajesh/data/result.rdd) Could you please help me how to resolve this issue. 15/03/13 17:19:31 INFO spark.SparkContext: Starting job: saveAsTextFile at NativeMethodAccessorImpl.java:-2 15/03/13 17:19:31 INFO scheduler.DAGScheduler: Got job 6 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) with 4 output partitions (allowLocal=false) 15/03/13 17:19:31 INFO scheduler.DAGScheduler: Final stage: Stage 10(saveAsTextFile at NativeMethodAccessorImpl.java:-2) 15/03/13 17:19:31 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/03/13 17:19:31 INFO scheduler.DAGScheduler: Missing parents: List() 15/03/13 17:19:31 INFO scheduler.DAGScheduler: Submitting Stage 10 (MappedRDD[31] at saveAsTextFile at NativeMethodAccessorImpl.java:-2), which has no missing parents 15/03/13 17:19:31 INFO storage.MemoryStore: ensureFreeSpace(98240) called with curMem=203866, maxMem=280248975 15/03/13 17:19:31 INFO storage.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 95.9 KB, free 267.0 MB) 15/03/13 17:19:31 INFO storage.MemoryStore: ensureFreeSpace(59150) called with curMem=302106, maxMem=280248975 15/03/13 17:19:31 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 57.8 KB, free 266.9 MB) 15/03/13 17:19:31 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:57655 (size: 57.8 KB, free: 267.2 MB) 15/03/13 17:19:31 INFO storage.BlockManagerMaster: Updated info of block broadcast_9_piece0 15/03/13 17:19:31 INFO spark.SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:838 15/03/13 17:19:31 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 10 (MappedRDD[31] at saveAsTextFile at NativeMethodAccessorImpl.java:-2) 15/03/13 17:19:31 INFO scheduler.TaskSchedulerImpl: Adding task set 10.0 with 4 tasks 15/03/13 17:19:31 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 10.0 (TID 8, localhost, PROCESS_LOCAL, 1375 bytes) 15/03/13 17:19:31 INFO executor.Executor: Running task 0.0 in stage 10.0 (TID 8) 15/03/13 17:19:31 INFO executor.Executor: Fetching http://10.0.2.15:54815/files/sftordd_pickle with timestamp 1426247370763 15/03/13 17:19:31 INFO util.Utils: Fetching http://10.0.2.15:54815/files/sftordd_pickle to /tmp/fetchFileTemp7846328782039551224.tmp 15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 15/03/13 17:19:31 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir terminate called after throwing an instance of 'std::invalid_argument' what(): stoi 15/03/13 17:19:31 ERROR python.PythonRDD: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /home/rajesh/spark-1.2.0/python/pyspark/worker.py, line 90, in main command = pickleSer._read_with_length(infile) File /home/rajesh/spark-1.2.0/python/pyspark/serializers.py, line 145, in _read_with_length length = read_int(stream) File /home/rajesh/spark-1.2.0/python/pyspark/serializers.py, line 511, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Subprocess exited with status 134 at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:161) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327
Re: saveAsTextFile extremely slow near finish
is your data skewed? Could it be that there are a few keys with a huge number of records? You might consider outputting (recordA, count) (recordB, count) instead of recordA recordA recordA ... you could do this with: input = sc.textFile pairsCounts = input.map{x = (x,1)}.reduceByKey{_ + _} sorted = pairs.sortByKey sorted.saveAsTextFile On Mon, Mar 9, 2015 at 12:31 PM, mingweili0x m...@spokeo.com wrote: I'm basically running a sorting using spark. The spark program will read from HDFS, sort on composite keys, and then save the partitioned result back to HDFS. pseudo code is like this: input = sc.textFile pairs = input.mapToPair sorted = pairs.sortByKey values = sorted.values values.saveAsTextFile Input size is ~ 160G, and I made 1000 partitions specified in JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress and the last few jobs just took forever and never finishes. Cluster setup: 8 nodes on each node: 15gb memory, 8 cores running parameters: --executor-memory 12G --conf spark.cores.max=60 Thank you for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile extremely slow near finish
Don't you think 1000 is too less for 160GB of data? Also you could try using KryoSerializer, Enabling RDD Compression. Thanks Best Regards On Mon, Mar 9, 2015 at 11:01 PM, mingweili0x m...@spokeo.com wrote: I'm basically running a sorting using spark. The spark program will read from HDFS, sort on composite keys, and then save the partitioned result back to HDFS. pseudo code is like this: input = sc.textFile pairs = input.mapToPair sorted = pairs.sortByKey values = sorted.values values.saveAsTextFile Input size is ~ 160G, and I made 1000 partitions specified in JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress and the last few jobs just took forever and never finishes. Cluster setup: 8 nodes on each node: 15gb memory, 8 cores running parameters: --executor-memory 12G --conf spark.cores.max=60 Thank you for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile extremely slow near finish
This is more of an aside, but why repartition this data instead of letting it define partitions naturally? You will end up with a similar number. On Mar 9, 2015 5:32 PM, mingweili0x m...@spokeo.com wrote: I'm basically running a sorting using spark. The spark program will read from HDFS, sort on composite keys, and then save the partitioned result back to HDFS. pseudo code is like this: input = sc.textFile pairs = input.mapToPair sorted = pairs.sortByKey values = sorted.values values.saveAsTextFile Input size is ~ 160G, and I made 1000 partitions specified in JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress and the last few jobs just took forever and never finishes. Cluster setup: 8 nodes on each node: 15gb memory, 8 cores running parameters: --executor-memory 12G --conf spark.cores.max=60 Thank you for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
saveAsTextFile extremely slow near finish
I'm basically running a sorting using spark. The spark program will read from HDFS, sort on composite keys, and then save the partitioned result back to HDFS. pseudo code is like this: input = sc.textFile pairs = input.mapToPair sorted = pairs.sortByKey values = sorted.values values.saveAsTextFile Input size is ~ 160G, and I made 1000 partitions specified in JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress and the last few jobs just took forever and never finishes. Cluster setup: 8 nodes on each node: 15gb memory, 8 cores running parameters: --executor-memory 12G --conf spark.cores.max=60 Thank you for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile of RDD[Array[Any]]
If you have `RDD[Array[Any]]` you can do rdd.map(_.mkString(\t)) or with some other delimiter to make it `RDD[String]`, and then call `saveAsTextFile`. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-of-RDD-Array-Any-tp21548p21554.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: performance of saveAsTextFile moving files from _temporary
Upon completion of the 2 hour part of the run, the files did not exist in the output directory? One thing that is done serially is deleting any remaining files from _temporary, so perhaps there was a lot of data remaining in _temporary but the committed data had already been moved. I am, unfortunately, not aware of other issues that would cause this to be so slow. On Tue, Jan 27, 2015 at 6:54 PM, Josh Walton j...@openbookben.com wrote: I'm not sure how to confirm how the moving is happening, however, one of the jobs just completed that I was talking about with 9k files of 4mb each. Spark UI showed the job being complete after ~2 hours. The last four hours of the job was just moving the files from _temporary to their final destination. The tasks for the write were definitely shown as complete, no logging is happening on the master or workers. The last line of my java code logs, but the job sits there as the moving of files happens. On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com wrote: This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case. Could you confirm how the moving is happening -- i.e., on the executors or the driver? On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote: We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: performance of saveAsTextFile moving files from _temporary
TLDR Extend FileOutPutCommitter to eliminate the temporary_storage. There are some implementations to be found online, typically called DirectOutputCommitter, f.i. this spark pull request https://github.com/themodernlife/spark/commit/4359664b1d557d55b0579023df809542386d5b8c. Tell Spark to use your shiny new committer when writing to an object store and all will be well. Aaron is on the right track but this renaming is bottlenecked by the object storage system itself, irrespective of being executed in the driver or the executor. Object stores (s3, google, azure, amplidata :P...) do not have a native rename (it is implemented as a server side copy operation, thus its duration is proportional to the object size). By default, Hadoop (and thus also Spark) uses rename from temporary to final output path to enable both retrying and speculative execution because HDFS is a single writer system, so multiple job attempts cannot write to the final output concurrently. Object stores do allow multiple concurrent-writes to the same output, which is exactly what makes a native rename nigh impossible. The solution is to enable this concurrent writing instead of renaming to final output by using a custom OutputCommitter which does not use a temporary location. Thomas Demoor skype: demoor.thomas mobile: +32 497883833 On Wed, Jan 28, 2015 at 3:54 AM, Josh Walton j...@openbookben.com wrote: I'm not sure how to confirm how the moving is happening, however, one of the jobs just completed that I was talking about with 9k files of 4mb each. Spark UI showed the job being complete after ~2 hours. The last four hours of the job was just moving the files from _temporary to their final destination. The tasks for the write were definitely shown as complete, no logging is happening on the master or workers. The last line of my java code logs, but the job sits there as the moving of files happens. On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com wrote: This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case. Could you confirm how the moving is happening -- i.e., on the executors or the driver? On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote: We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
performance of saveAsTextFile moving files from _temporary
We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: performance of saveAsTextFile moving files from _temporary
This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case. Could you confirm how the moving is happening -- i.e., on the executors or the driver? On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote: We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org