I also looked into the MemoryStream as a workaround but I do see that it is not recommended for production use. But then, since I am converting this to a DS immediately, I couldn't see a problem with mutation.
Could you let me know if this is a better idea than to drop in an empty file (which is error friendly). Even better, is there a cleaner way to create an empty stream. val emptyErrorStream = (spark:SparkSession) => { implicit val sqlC = spark.sqlContext MemoryStream[DataError].toDS() } Cheers, Arun On Mon, Nov 5, 2018 at 2:41 PM Arun Manivannan <a...@arunma.com> wrote: > Hi, > > This is going to come off as a silly question with a stream being > unbounded but this is problem that I have (created for myself). > > I am trying to build an ETL pipeline and I have a bunch of stages. > > val pipelineStages = List( > new AddRowKeyStage(EvergreenSchema), > new WriteToHBaseStage(hBaseCatalog), > new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols), > new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols), > new DataTypeValidatorStage(EvergreenSchema), > new DataTypeCastStage(EvergreenSchema) > ) > > *I would like to collect the errors at each of these stages into a > different stream. *I am using a WriterMonad for this. I have made > provisions that the "collection" part of the Monad is also a DataFrame. > Now, I would like to do a : > > val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors, > stage) => > for { > df <- dfWithErrors > applied <- stage.apply(df) > } yield applied > } > > Now, the tricky bit is this : > > val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf) > > > The "empty" of the fold must be an empty stream. With Spark batch, I can > always use an "emptyDataFrame" but I have no clue on how to achieve this in > Spark streaming. Unfortunately, "emptyDataFrame" is not "isStreaming" and > therefore I won't be able to union the errors together. > > Appreciate if you could give me some pointers. > > Cheers, > Arun >