Hello Spark Developers,

I have a question on Spark Join I am doing.

I have a full load data from RDBMS and storing at HDFS let's say,

val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*)

and I am getting changed data at seperate hdfs path,let's say;

val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")

Now I would like to take rows from deltaDF and ignore only those
records from historyDF, and write to some MySQL table.

Once I am done with writing to MySQL table, I would like to update
*/home/test/transaction-line-item *as overwrite. Now I can't just

overwrite because lazy evaluation and DAG structure unless write to
somewhere else and then write back as overwrite.

val syncDataDF =
historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID",
"sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
  "left_outer").filter(deltaDF.col("sys_change_column").isNull)
    .drop(deltaDF.col("sys_change_column"))

val mergedDataDF = syncDataDF.union(deltaDF)

I believe, Without doing *union *, only with Join this can be done.
Please suggest best approach.

As I can't write back *mergedDataDF * to the path of historyDF,
because from there I am only reading. What I am doing is to write at
temp

path and then read  from there and write back! Which is bad Idea, I
need suggestion here...


mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*)


Please suggest me best approach.


Thanks

Reply via email to