I also looked into the MemoryStream as a workaround but I do see that it is
not recommended for production use. But then, since I am converting this to
a DS immediately, I couldn't see a problem with mutation.
Could you let me know if this is a better idea than to drop in an empty
file (which is error friendly). Even better, is there a cleaner way to
create an empty stream.
val emptyErrorStream = (spark:SparkSession) => {
implicit val sqlC = spark.sqlContext
MemoryStream[DataError].toDS()
}
Cheers,
Arun
On Mon, Nov 5, 2018 at 2:41 PM Arun Manivannan <[email protected]> wrote:
> Hi,
>
> This is going to come off as a silly question with a stream being
> unbounded but this is problem that I have (created for myself).
>
> I am trying to build an ETL pipeline and I have a bunch of stages.
>
> val pipelineStages = List(
> new AddRowKeyStage(EvergreenSchema),
> new WriteToHBaseStage(hBaseCatalog),
> new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
> new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
> new DataTypeValidatorStage(EvergreenSchema),
> new DataTypeCastStage(EvergreenSchema)
> )
>
> *I would like to collect the errors at each of these stages into a
> different stream. *I am using a WriterMonad for this. I have made
> provisions that the "collection" part of the Monad is also a DataFrame.
> Now, I would like to do a :
>
> val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors,
> stage) =>
> for {
> df <- dfWithErrors
> applied <- stage.apply(df)
> } yield applied
> }
>
> Now, the tricky bit is this :
>
> val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)
>
>
> The "empty" of the fold must be an empty stream. With Spark batch, I can
> always use an "emptyDataFrame" but I have no clue on how to achieve this in
> Spark streaming. Unfortunately, "emptyDataFrame" is not "isStreaming" and
> therefore I won't be able to union the errors together.
>
> Appreciate if you could give me some pointers.
>
> Cheers,
> Arun
>