Adding DEV mailing list to see if this is a defect with ConnectedComponent or if they can recommend any solution.
Thanks Ankur On Thu, Jan 5, 2017 at 1:10 PM, Ankur Srivastava <ankur.srivast...@gmail.com > wrote: > Yes I did try it out and it choses the local file system as my checkpoint > location starts with s3n:// > > I am not sure how can I make it load the S3FileSystem. > > On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung <felixcheun...@hotmail.com> > wrote: > >> Right, I'd agree, it seems to be only with delete. >> >> Could you by chance run just the delete to see if it fails >> >> FileSystem.get(sc.hadoopConfiguration) >> .delete(new Path(somepath), true) >> ------------------------------ >> *From:* Ankur Srivastava <ankur.srivast...@gmail.com> >> *Sent:* Thursday, January 5, 2017 10:05:03 AM >> *To:* Felix Cheung >> *Cc:* u...@spark.apache.org >> >> *Subject:* Re: Spark GraphFrame ConnectedComponents >> >> Yes it works to read the vertices and edges data from S3 location and is >> also able to write the checkpoint files to S3. It only fails when deleting >> the data and that is because it tries to use the default file system. I >> tried looking up how to update the default file system but could not find >> anything in that regard. >> >> Thanks >> Ankur >> >> On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung <felixcheun...@hotmail.com> >> wrote: >> >>> From the stack it looks to be an error from the explicit call to >>> hadoop.fs.FileSystem. >>> >>> Is the URL scheme for s3n registered? >>> Does it work when you try to read from s3 from Spark? >>> >>> _____________________________ >>> From: Ankur Srivastava <ankur.srivast...@gmail.com> >>> Sent: Wednesday, January 4, 2017 9:23 PM >>> Subject: Re: Spark GraphFrame ConnectedComponents >>> To: Felix Cheung <felixcheun...@hotmail.com> >>> Cc: <u...@spark.apache.org> >>> >>> >>> >>> This is the exact trace from the driver logs >>> >>> Exception in thread "main" java.lang.IllegalArgumentException: Wrong >>> FS: s3n://<checkpoint-folder>/8ac233e4-10f9-4eb3-aa53-df6d9d7ea7 >>> be/connected-components-c1dbc2b0/3, expected: file:/// >>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645) >>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF >>> ileSystem.java:80) >>> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta >>> tus(RawLocalFileSystem.java:529) >>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt >>> ernal(RawLocalFileSystem.java:747) >>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc >>> alFileSystem.java:524) >>> at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileS >>> ystem.java:534) >>> at org.graphframes.lib.ConnectedComponents$.org$graphframes$lib >>> $ConnectedComponents$$run(ConnectedComponents.scala:340) >>> at org.graphframes.lib.ConnectedComponents.run(ConnectedCompone >>> nts.scala:139) >>> at GraphTest.main(GraphTest.java:31) ----------- Application Class >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>> ssorImpl.java:57) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>> thodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy >>> $SparkSubmit$$runMain(SparkSubmit.scala:731) >>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit >>> .scala:181) >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> >>> And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10 >>> >>> Thanks >>> Ankur >>> >>> On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava < >>> ankur.srivast...@gmail.com> wrote: >>> >>>> Hi >>>> >>>> I am rerunning the pipeline to generate the exact trace, I have below >>>> part of trace from last run: >>>> >>>> Exception in thread "main" java.lang.IllegalArgumentException: Wrong >>>> FS: s3n://<folder-path>, expected: file:/// >>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642) >>>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF >>>> ileSystem.java:69) >>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc >>>> alFileSystem.java:516) >>>> at >>>> org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:528) >>>> >>>> >>>> Also I think the error is happening in this part of the code >>>> "ConnectedComponents.scala:339" I am referring the code @ >>>> https://github.com/graphframes/graphframes/blob/master/src/ >>>> main/scala/org/graphframes/lib/ConnectedComponents.scala >>>> >>>> if (shouldCheckpoint && (iteration % checkpointInterval == 0)) { >>>> // TODO: remove this after DataFrame.checkpoint is implemented >>>> val out = s"${checkpointDir.get}/$iteration" >>>> ee.write.parquet(out) >>>> // may hit S3 eventually consistent issue >>>> ee = sqlContext.read.parquet(out) >>>> >>>> // remove previous checkpoint >>>> if (iteration > checkpointInterval) { >>>> *FileSystem.get(sc.hadoopConfiguration)* >>>> * .delete(new Path(s"${checkpointDir.get}/${iteration - >>>> checkpointInterval}"), true)* >>>> } >>>> >>>> System.gc() // hint Spark to clean shuffle directories >>>> } >>>> >>>> >>>> Thanks >>>> Ankur >>>> >>>> On Wed, Jan 4, 2017 at 5:19 PM, Felix Cheung <felixcheun...@hotmail.com >>>> > wrote: >>>> >>>>> Do you have more of the exception stack? >>>>> >>>>> >>>>> ------------------------------ >>>>> *From:* Ankur Srivastava <ankur.srivast...@gmail.com> >>>>> *Sent:* Wednesday, January 4, 2017 4:40:02 PM >>>>> *To:* u...@spark.apache.org >>>>> *Subject:* Spark GraphFrame ConnectedComponents >>>>> >>>>> Hi, >>>>> >>>>> I am trying to use the ConnectedComponent algorithm of GraphFrames but >>>>> by default it needs a checkpoint directory. As I am running my spark >>>>> cluster with S3 as the DFS and do not have access to HDFS file system I >>>>> tried using a s3 directory as checkpoint directory but I run into below >>>>> exception: >>>>> >>>>> Exception in thread "main"java.lang.IllegalArgumentException: Wrong >>>>> FS: s3n://<folder-path>, expected: file:/// >>>>> >>>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642) >>>>> >>>>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF >>>>> ileSystem.java:69) >>>>> >>>>> If I set checkpoint interval to -1 to avoid checkpointing the driver >>>>> just hangs after 3 or 4 iterations. >>>>> >>>>> Is there some way I can set the default FileSystem to S3 for Spark or >>>>> any other option? >>>>> >>>>> Thanks >>>>> Ankur >>>>> >>>>> >>>> >>> >>> >>> >> >