Hello Nicholas, I sincerely apologise.
Thanks On Wed, May 15, 2019 at 11:34 PM Nicholas Chammas < nicholas.cham...@gmail.com> wrote: > This kind of question is for the User list, or for something like Stack > Overflow. It's not on topic here. > > The dev list (i.e. this list) is for discussions about the development of > Spark itself. > > On Wed, May 15, 2019 at 1:50 PM Chetan Khatri <chetan.opensou...@gmail.com> > wrote: > >> Any one help me, I am confused. :( >> >> On Wed, May 15, 2019 at 7:28 PM Chetan Khatri < >> chetan.opensou...@gmail.com> wrote: >> >>> Hello Spark Developers, >>> >>> I have a question on Spark Join I am doing. >>> >>> I have a full load data from RDBMS and storing at HDFS let's say, >>> >>> val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*) >>> >>> and I am getting changed data at seperate hdfs path,let's say; >>> >>> val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta") >>> >>> Now I would like to take rows from deltaDF and ignore only those records >>> from historyDF, and write to some MySQL table. >>> >>> Once I am done with writing to MySQL table, I would like to update >>> */home/test/transaction-line-item *as overwrite. Now I can't just >>> >>> overwrite because lazy evaluation and DAG structure unless write to >>> somewhere else and then write back as overwrite. >>> >>> val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", >>> "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"), >>> "left_outer").filter(deltaDF.col("sys_change_column").isNull) >>> .drop(deltaDF.col("sys_change_column")) >>> >>> val mergedDataDF = syncDataDF.union(deltaDF) >>> >>> I believe, Without doing *union *, only with Join this can be done. Please >>> suggest best approach. >>> >>> As I can't write back *mergedDataDF * to the path of historyDF, because >>> from there I am only reading. What I am doing is to write at temp >>> >>> path and then read from there and write back! Which is bad Idea, I need >>> suggestion here... >>> >>> >>> mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/") >>> val tempMergedDF = >>> spark.read.parquet("home/test/transaction-line-item-temp/") >>> tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*) >>> >>> >>> Please suggest me best approach. >>> >>> >>> Thanks >>> >>> >>> >>>