Any one help me, I am confused. :( On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <chetan.opensou...@gmail.com> wrote:
> Hello Spark Developers, > > I have a question on Spark Join I am doing. > > I have a full load data from RDBMS and storing at HDFS let's say, > > val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*) > > and I am getting changed data at seperate hdfs path,let's say; > > val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta") > > Now I would like to take rows from deltaDF and ignore only those records from > historyDF, and write to some MySQL table. > > Once I am done with writing to MySQL table, I would like to update > */home/test/transaction-line-item *as overwrite. Now I can't just > > overwrite because lazy evaluation and DAG structure unless write to somewhere > else and then write back as overwrite. > > val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", > "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"), > "left_outer").filter(deltaDF.col("sys_change_column").isNull) > .drop(deltaDF.col("sys_change_column")) > > val mergedDataDF = syncDataDF.union(deltaDF) > > I believe, Without doing *union *, only with Join this can be done. Please > suggest best approach. > > As I can't write back *mergedDataDF * to the path of historyDF, because from > there I am only reading. What I am doing is to write at temp > > path and then read from there and write back! Which is bad Idea, I need > suggestion here... > > > mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/") > val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/") > tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*) > > > Please suggest me best approach. > > > Thanks > > > >