Re: An alternative logic to collaborative filtering works fine but we are facing run time issues in executing the job
Hi Balakumar Two things. One - It seems like your cluster is running out of memory and then eventually out of disc , likely while materializing the dataframe to write (what's the volume of data created by the join?) Two - Your job is running in local mode, and is able to utilize just the master node resources. Try running the job in yarn mode and if the issue persists, try increasing the disc volumes. Best Regards Ankit Khettry On Wed, 17 Apr, 2019, 9:44 AM Balakumar iyer S, wrote: > Hi , > > > While running the following spark code in the cluster with following > configuration it is spread into 3 job Id's > > CLUSTER CONFIGURATION > > 3 NODE CLUSTER > > NODE 1 - 64GB 16CORES > > NODE 2 - 64GB 16CORES > > NODE 3 - 64GB 16CORES > > > At Job Id 2 job is stuck at the stage 51 of 254 and then it starts > utilising the disk space I am not sure why is this happening and my work is > completely ruined . could someone help me on this > > I have attached screen shot of spark stages which are stuck for reference > > Please let me know for more questions with the setup and code > Thanks > > > > code: > >def main(args: Array[String]) { > > Logger.getLogger("org").setLevel(Level.ERROR) > > val ss = SparkSession > > .builder > > .appName("join_association").master("local[*]") > > .getOrCreate() > > import ss.implicits._ > > val dframe = ss.read.option("inferSchema", > value=true).option("delimiter", ",").csv("in/matrimony.txt") > > dframe.show() > > dframe.printSchema() > > //left_frame > > > > val dfLeft = dframe.withColumnRenamed("_c1", "left_data") > > > > val dfRight = dframe.withColumnRenamed("_c1", "right_data") > > > > //Join > > > > val joined = dfLeft.join(dfRight , dfLeft.col("_c0") === > dfRight.col("_c0") ).filter(col("left_data") !== col("right_data") ) > > > > joined.show() > > > > val result = joined.select(col("left_data"), col("right_data") as > "similar_ids" ) > > > > result.write.csv("/output") > > ss.stop() > > > > } > > > > -- > REGARDS > BALAKUMAR SEETHARAMAN > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
An alternative logic to collaborative filtering works fine but we are facing run time issues in executing the job
Hi , While running the following spark code in the cluster with following configuration it is spread into 3 job Id's CLUSTER CONFIGURATION 3 NODE CLUSTER NODE 1 - 64GB 16CORES NODE 2 - 64GB 16CORES NODE 3 - 64GB 16CORES At Job Id 2 job is stuck at the stage 51 of 254 and then it starts utilising the disk space I am not sure why is this happening and my work is completely ruined . could someone help me on this I have attached screen shot of spark stages which are stuck for reference Please let me know for more questions with the setup and code Thanks code: def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) val ss = SparkSession .builder .appName("join_association").master("local[*]") .getOrCreate() import ss.implicits._ val dframe = ss.read.option("inferSchema", value=true).option("delimiter", ",").csv("in/matrimony.txt") dframe.show() dframe.printSchema() //left_frame val dfLeft = dframe.withColumnRenamed("_c1", "left_data") val dfRight = dframe.withColumnRenamed("_c1", "right_data") //Join val joined = dfLeft.join(dfRight , dfLeft.col("_c0") === dfRight.col("_c0") ).filter(col("left_data") !== col("right_data") ) joined.show() val result = joined.select(col("left_data"), col("right_data") as "similar_ids" ) result.write.csv("/output") ss.stop() } -- REGARDS BALAKUMAR SEETHARAMAN - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [External Sender] How to use same SparkSession in another app?
Why not save the data frame to persistent storage s3/HDFS in the first application and read it back in the 2nd ? On Tue, Apr 16, 2019 at 8:58 PM Rishikesh Gawade wrote: > Hi. > I wish to use a SparkSession created by one app in another app so that i > can use the dataframes belonging to that session. Is it possible to use the > same sparkSession in another app? > Thanks, > Rishikesh > -- Card Machine Learning (ML) Team, Capital One The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to use same SparkSession in another app?
Hi, Not possible. What are you really trying to do? Why do you need to share dataframes? They're nothing but metadata of a distributed computation (no data inside) so what would be the purpose of such sharing? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Tue, Apr 16, 2019 at 1:57 PM Rishikesh Gawade wrote: > Hi. > I wish to use a SparkSession created by one app in another app so that i > can use the dataframes belonging to that session. Is it possible to use the > same sparkSession in another app? > Thanks, > Rishikesh >
Re: Reading RDD by (key, data) from s3
You can't, sparkcontext is a singleton object. You have to use hadoop library or aws client to read files on s3. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Dynamic executor scaling spark/Kubernetes
Hello, Is Kubernetes Dynamic executor scaling for spark is available in latest release of spark I mean scaling the executors based on the work load vs preallocating number of executors for a spark job Thanks, Purna
How to use same SparkSession in another app?
Hi. I wish to use a SparkSession created by one app in another app so that i can use the dataframes belonging to that session. Is it possible to use the same sparkSession in another app? Thanks, Rishikesh
Reading RDD by (key, data) from s3
Hi, I am trying to read gzipped json data from s3, my idea would be to do => data = (s3_keys .mapValues(lambda x: x, s3_read_data(x) ) for that I though about using sc.textFile instead of s3_read_data, but wouldn't work. Any idea how to achieve a solution in here? Cheers, Gorka.
K8s-Spark client mode : Executor image not able to download application jar from driver
Environment: Spark: 2.4.0 Kubernetes:1.14 Query: Does application jar needs to be part of both Driver and Executor image? Invocation point (from Java code): sparkLaunch = new SparkLauncher() .setMaster(LINUX_MASTER) .setAppResource(LINUX_APP_RESOURCE) .setConf("spark.app.name",APP_NAME) .setMainClass(MAIN_CLASS) .setConf("spark.executor.instances",EXECUTOR_COUNT) .setConf("spark.kubernetes.container.image",CONTAINER_IMAGE) .setConf("spark.kubernetes.driver.pod.name",DRIVER_POD_NAME) .setConf("spark.kubernetes.container.image.pullSecrets",REGISTRY_SECRET) .setConf("spark.kubernetes.authenticate.driver.serviceAccountName",SERVICE_ACCOUNT_NAME) .setConf("spark.driver.host", SERVICE_NAME + "." + NAMESPACE + ".svc.cluster.local") .setConf("spark.driver.port", DRIVER_PORT) .setDeployMode("client") ; Scenario: I am trying to run Spark on K8s in client mode. When I put application jar image both in driver and executor then program work fines. But, if I put application jar in driver image only then I get following error: 2019-04-16 06:36:44 INFO Executor:54 - Fetching file:/opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar with timestamp 1555396592768 2019-04-16 06:36:44 INFO Utils:54 - Copying /opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar to /var/data/spark-d24c8fbc-4fe7-4968-9310-f891a097d1e7/spark-31ba5cbb-3132-408c-991a-795 2019-04-16 06:36:44 ERROR Executor:91 - Exception in task 0.1 in stage 0.0 (TID 2) java.nio.file.NoSuchFileException: /opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) at java.base/sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:548) at java.base/sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:254) at java.base/java.nio.file.Files.copy(Files.java:1294) at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$copyRecursive(Utils.scala:664) at org.apache.spark.util.Utils$.copyFile(Utils.scala:635) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:805) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:797) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:797) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:369) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org