Hi,

This is going to come off as a silly question with a stream being unbounded
but this is problem that I have (created for myself).

I am trying to build an ETL pipeline and I have a bunch of stages.

val pipelineStages = List(
  new AddRowKeyStage(EvergreenSchema),
  new WriteToHBaseStage(hBaseCatalog),
  new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
  new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
  new DataTypeValidatorStage(EvergreenSchema),
  new DataTypeCastStage(EvergreenSchema)
)

*I would like to collect the errors at each of these stages into a
different stream. *I am using a WriterMonad for this.  I have made
provisions that the "collection" part of the Monad is also a DataFrame.
Now, I would like to do a :

val validRecords = pipelineStages.foldLeft(initDf) { case
(dfWithErrors, stage) =>
  for {
    df <- dfWithErrors
    applied <- stage.apply(df)
  } yield applied
}

Now, the tricky bit is this :

val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)


The "empty" of the fold must be an empty stream. With Spark batch, I can
always use an "emptyDataFrame" but I have no clue on how to achieve this in
Spark streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and
therefore I won't be able to union the errors together.

Appreciate if you could give me some pointers.

Cheers,
Arun

Reply via email to