Hi,
This is going to come off as a silly question with a stream being unbounded
but this is problem that I have (created for myself).
I am trying to build an ETL pipeline and I have a bunch of stages.
val pipelineStages = List(
new AddRowKeyStage(EvergreenSchema),
new WriteToHBaseStage(hBaseCatalog),
new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
new DataTypeValidatorStage(EvergreenSchema),
new DataTypeCastStage(EvergreenSchema)
)
*I would like to collect the errors at each of these stages into a
different stream. *I am using a WriterMonad for this. I have made
provisions that the "collection" part of the Monad is also a DataFrame.
Now, I would like to do a :
val validRecords = pipelineStages.foldLeft(initDf) { case
(dfWithErrors, stage) =>
for {
df <- dfWithErrors
applied <- stage.apply(df)
} yield applied
}
Now, the tricky bit is this :
val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)
The "empty" of the fold must be an empty stream. With Spark batch, I can
always use an "emptyDataFrame" but I have no clue on how to achieve this in
Spark streaming. Unfortunately, "emptyDataFrame" is not "isStreaming" and
therefore I won't be able to union the errors together.
Appreciate if you could give me some pointers.
Cheers,
Arun