Hi Team,

I have a streaming pipeline (built using Apache Beam with Spark Runner)which 
consumes events tagged with timestamps from Unbounded source (Kinesis Stream) 
and batch them into FixedWindows of 5 mins each and then, write all events in a 
window into a single / multiple files based on shards.
We are trying to achieve the following through Apache Beam constructs
1.       Create a PCollectionView from unbounded source and pass it as a 
side-input to our main pipeline.
2.       Have a hook method that invokes per window that enables us to do some 
operational activities per window.
3.       Stop the stream processor (graceful stop) from external system.

Approaches that we tried for 1).
*        Creating a PCollectionView from unbounded source and pass it as a 
side-input to our main pipeline.
*        Input Pcollection goes through FixedWindow transform.
*        Created custom CombineFn that takes combines all inputs for a window 
and produce single value Pcollection.
*        Output of Window transform it goes to CombineFn (custom fn) and 
creates a PCollectionView from CombineFn (using 
Combine.Globally().asSingletonView() as this output would be passed as a 
side-input for our main pipeline.
o   Getting the following exception (while running with streaming option set to 
true)
*        java.lang.IllegalStateException: No TransformEvaluator registered for 
UNBOUNDED transform View.CreatePCollectionView

     *   Noticed that SparkRunner doesn't support the streaming side-inputs in 
the Spark runner
        *   
https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 (View.CreatePCollectionView.class not added to EVALUATORS Map)
        *   https://issues.apache.org/jira/browse/BEAM-2112
        *   https://issues.apache.org/jira/browse/BEAM-1564
So would like to understand on this BEAM-1564 ticket.

Approaches that we tried for 2).
Tried to implement the operational activities in extractOutput() of CombineFn 
as extractOutput() called once per window. We hadn't tested this as this is 
blocked by Issue 1).
Is there any other recommended approaches to implement this feature?

Looking for recommended approaches to implement feature 3).

Many Thanks,
Viswa.




Reply via email to