Hello everyone,


Jira:

https://issues.apache.org/jira/browse/BEAM-2466



My Branch:

https://github.com/kyle-winkelman/beam/tree/kafka-streams



I have taken an initial pass at creating a Kafka Streams Runner. It passes
nearly all of the @ValidatesRunner tests. I am hoping to find someone that
has experience writing a Runner to take a look at it and give me some
feedback before I open a PR.



I am using the Kafka Streams DSL
<https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html>,
so a PCollection is equivalent to a KStream
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html>
.



ParDo:

  - implements Transformer
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/Transformer.html>
 to KStream#transform
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-org.apache.kafka.streams.kstream.Named-java.lang.String...->

  - outputs KStream<TupleTag<?>, WindowedValue<?>> then uses a
KStream#branch
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...->
to
handle Pardo.MultiOutput

  - schedules a Punctuator
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/Punctuator.html>
to
periodically start/finish bundles



GroupByKey:

  - KStream#repartition
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#repartition-->
the
data to groupOnlyByKey

  - groupAlsoByWindow similarly to ParDo, runs ReduceFn instead of DoFn



Flatten:

  - KStream#merge
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream->



Combine:

  - not as composite, but as primitive GroupByKey/ParDo



Composite Transforms:

  - inlining (I believe)



Side Inputs:

  - write data to a topic with the key being the StateNamespace.stringKey()

  - read topic into a GlobalKTable
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html>
and
materialize it into a KeyValueStore
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/state/KeyValueStore.html>

  - in ParDo, access the KeyValueStore, via the ProcessorContext
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html>,
and use the WindowFn to get the Side Input Window and its
StateNamespace.stringKey() to look up the value



Source API:

  - overridden by SplittableDoFn, via Read.SPLITTABLE_DOFN_PREFERRED_RUNNERS



Impulse:

  - if not exists, create a topic in Kafka and use a KafkaProducer
<https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>
to
generate the initial record

  - generate a KStream from the above topic



SplittableDoFn:

  - overriden by SplittableParDo/SplittableParDoViaKeyedWorkItems

  - extends Stateful Processing, runs ProcessFn instead of DoFn and
Punctuator fires timers as expected by the ProcessFn

  - write watermarks to a separate topic to be aggregated and read into a
GlobalKTable and materialize it into a KeyValueStore



Stateful Processing:

  - StateInternals and TimerInternals are materialized in KeyValueStores

  - extends ParDo, with statefulDoFnRunner and Punctuator advances
TimerInternals (via watermarks KeyValueStore) and fire timers


Thanks,

Kyle Winkelman

Reply via email to