I also looked into the MemoryStream as a workaround but I do see that it is
not recommended for production use. But then, since I am converting this to
a DS immediately, I couldn't see a problem with mutation.

Could you let me know if this is a better idea than to drop in an empty
file (which is error friendly).  Even better, is there a cleaner way to
create an empty stream.

val emptyErrorStream = (spark:SparkSession) => {
  implicit val sqlC = spark.sqlContext
  MemoryStream[DataError].toDS()
}


Cheers,
Arun

On Mon, Nov 5, 2018 at 2:41 PM Arun Manivannan <a...@arunma.com> wrote:

> Hi,
>
> This is going to come off as a silly question with a stream being
> unbounded but this is problem that I have (created for myself).
>
> I am trying to build an ETL pipeline and I have a bunch of stages.
>
> val pipelineStages = List(
>   new AddRowKeyStage(EvergreenSchema),
>   new WriteToHBaseStage(hBaseCatalog),
>   new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
>   new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
>   new DataTypeValidatorStage(EvergreenSchema),
>   new DataTypeCastStage(EvergreenSchema)
> )
>
> *I would like to collect the errors at each of these stages into a
> different stream. *I am using a WriterMonad for this.  I have made
> provisions that the "collection" part of the Monad is also a DataFrame.
> Now, I would like to do a :
>
> val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors, 
> stage) =>
>   for {
>     df <- dfWithErrors
>     applied <- stage.apply(df)
>   } yield applied
> }
>
> Now, the tricky bit is this :
>
> val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)
>
>
> The "empty" of the fold must be an empty stream. With Spark batch, I can
> always use an "emptyDataFrame" but I have no clue on how to achieve this in
> Spark streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and
> therefore I won't be able to union the errors together.
>
> Appreciate if you could give me some pointers.
>
> Cheers,
> Arun
>

Reply via email to