hi, I was writing a simple stream app, all it does is producer send a sequence of path and value, for example path /0 , value 1 path /0/1, value 2 path /0/1/2, value 3 and kafka stream take those input and produce a ktable store.
There is a rule. if parent path is not exist, then child can not insert. so if /0/1 is not there, insert /0/1/2 should be filter out. I use the following program to process it. path send as /0, /0/1, /0/1/2, ....., /0/1/../9. Because the filter is depends on the ktable store, which was build after the filter stream. When filter check for a path if its parent exist, it could be the parent path already pass the filter, but not at the store yet, but from filter, it think the parent is not exist. this is more of a problem of asyn processing. because the parent is not fully done( to the store), and next element start processing (filter) Another problem is because parent key and child key are different, so the path arrival seq could be different as producer send sequence, which also cause the child get filter out. producer send as /0, /0/1, /0/1/2.. but broker get it as /0, /0/1/2, /0/1,.....then all the following path will be filter out, because /0/1/2 don't get a chance to get created. any thoughts to solve this? Thanks, Nan val streamProperties = new Properties() streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application1") streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG, "important-test-client") streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, classOf[StringSerde].getName) streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[IntegerSerde].getName) val streamBuilder = new StreamsBuilder() val topic = "input" val inputStream = streamBuilder.stream[String, Integer](topic) val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes, Array[Byte]]](topic + "_store") .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()) val reducer = new Reducer[Integer](){ override def apply(value1: Integer, value2: Integer): Integer = { value2 } } //value is not important, only care key. val ktable = inputStream.filter(filter).groupByKey().reduce(reducer, materialized) // make sure parent exist. def filter(key: String, value: Integer): Boolean = { println("===current store===, checking key " + key + " value: " + value ) store.all().forEachRemaining(x => println(x.key)) val parent = key.trim().substring(0, key.lastIndexOf("/")) if(parent == "") { true } else { if (store.get(parent) == null) { println("not found parent" + parent) false } else { true } } } val topology = streamBuilder.build() val streams = new KafkaStreams(topology, streamProperties) streams.start() Thread.sleep(6000) val storeName = ktable.queryableStoreName() val store = streams.store(storeName, QueryableStoreTypes.keyValueStore[String, Integer]) val senderProperties = new Properties senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer") senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[IntegerSerializer].getName) val producer = new KafkaProducer[String, Integer](senderProperties) for(j <- 1 until 10) { val path = for(i <- 0 until j) yield { "/" + i } producer.send(new ProducerRecord(topic, path.mkString(""), j)) } Thread.sleep(3000) println("====show final store state====") store.all().forEachRemaining(x => println(x.key)) Thread.sleep(10000000) output: ===current store===, checking key /0 value: 1 ===current store===, checking key /0/1 value: 2 /0 ===current store===, checking key /0/1/2/3 value: 4 /0/1 /0 not found parent/0/1/2 ===current store===, checking key /0/1/2 value: 3 /0/1 /0 ===current store===, checking key /0/1/2/3/4/5 value: 6 /0/1 /0 /0/1/2 not found parent/0/1/2/3/4 ===current store===, checking key /0/1/2/3/4 value: 5 /0/1 /0 /0/1/2 not found parent/0/1/2/3 ===current store===, checking key /0/1/2/3/4/5/6 value: 7 /0/1 /0 /0/1/2 not found parent/0/1/2/3/4/5 ===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9 /0/1 /0 /0/1/2 not found parent/0/1/2/3/4/5/6/7 ===current store===, checking key /0/1/2/3/4/5/6/7 value: 8 /0/1 /0 /0/1/2 not found parent/0/1/2/3/4/5/6 ====show final store state==== /0/1 /0 /0/1/2