Hello everyone,
Jira: https://issues.apache.org/jira/browse/BEAM-2466 My Branch: https://github.com/kyle-winkelman/beam/tree/kafka-streams I have taken an initial pass at creating a Kafka Streams Runner. It passes nearly all of the @ValidatesRunner tests. I am hoping to find someone that has experience writing a Runner to take a look at it and give me some feedback before I open a PR. I am using the Kafka Streams DSL <https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html>, so a PCollection is equivalent to a KStream <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html> . ParDo: - implements Transformer <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/Transformer.html> to KStream#transform <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-org.apache.kafka.streams.kstream.Named-java.lang.String...-> - outputs KStream<TupleTag<?>, WindowedValue<?>> then uses a KStream#branch <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...-> to handle Pardo.MultiOutput - schedules a Punctuator <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/Punctuator.html> to periodically start/finish bundles GroupByKey: - KStream#repartition <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#repartition--> the data to groupOnlyByKey - groupAlsoByWindow similarly to ParDo, runs ReduceFn instead of DoFn Flatten: - KStream#merge <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-> Combine: - not as composite, but as primitive GroupByKey/ParDo Composite Transforms: - inlining (I believe) Side Inputs: - write data to a topic with the key being the StateNamespace.stringKey() - read topic into a GlobalKTable <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/GlobalKTable.html> and materialize it into a KeyValueStore <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/state/KeyValueStore.html> - in ParDo, access the KeyValueStore, via the ProcessorContext <https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html>, and use the WindowFn to get the Side Input Window and its StateNamespace.stringKey() to look up the value Source API: - overridden by SplittableDoFn, via Read.SPLITTABLE_DOFN_PREFERRED_RUNNERS Impulse: - if not exists, create a topic in Kafka and use a KafkaProducer <https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html> to generate the initial record - generate a KStream from the above topic SplittableDoFn: - overriden by SplittableParDo/SplittableParDoViaKeyedWorkItems - extends Stateful Processing, runs ProcessFn instead of DoFn and Punctuator fires timers as expected by the ProcessFn - write watermarks to a separate topic to be aggregated and read into a GlobalKTable and materialize it into a KeyValueStore Stateful Processing: - StateInternals and TimerInternals are materialized in KeyValueStores - extends ParDo, with statefulDoFnRunner and Punctuator advances TimerInternals (via watermarks KeyValueStore) and fire timers Thanks, Kyle Winkelman
