Hi, This is going to come off as a silly question with a stream being unbounded but this is problem that I have (created for myself).
I am trying to build an ETL pipeline and I have a bunch of stages. val pipelineStages = List( new AddRowKeyStage(EvergreenSchema), new WriteToHBaseStage(hBaseCatalog), new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols), new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols), new DataTypeValidatorStage(EvergreenSchema), new DataTypeCastStage(EvergreenSchema) ) *I would like to collect the errors at each of these stages into a different stream. *I am using a WriterMonad for this. I have made provisions that the "collection" part of the Monad is also a DataFrame. Now, I would like to do a : val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors, stage) => for { df <- dfWithErrors applied <- stage.apply(df) } yield applied } Now, the tricky bit is this : val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf) The "empty" of the fold must be an empty stream. With Spark batch, I can always use an "emptyDataFrame" but I have no clue on how to achieve this in Spark streaming. Unfortunately, "emptyDataFrame" is not "isStreaming" and therefore I won't be able to union the errors together. Appreciate if you could give me some pointers. Cheers, Arun