hi,
I was writing a simple stream app, all it does is producer send a sequence
of path and value, for example
path /0 ,  value 1
path /0/1,  value 2
path /0/1/2, value 3
and kafka stream take those input and produce a ktable store.

There is a rule. if parent path is not exist, then child can not insert.
so if /0/1 is not there,  insert /0/1/2 should be filter out.

I use the following program to process it.  path send as /0, /0/1, /0/1/2,
....., /0/1/../9.

Because the filter is depends on the ktable store, which was build after
the filter stream.  When filter check for a path if its parent exist, it
could be the parent path already pass the filter, but not at the store
yet,  but from filter, it think the parent is not exist. this is more of a
problem of asyn processing. because the parent is not fully done( to the
store), and next element start processing (filter)

Another problem is because parent key and child key are different, so the
path arrival seq could be different as producer send sequence, which also
cause the child get filter out.  producer send as /0, /0/1, /0/1/2.. but
broker get it as /0, /0/1/2, /0/1,.....then all the following path will be
filter out, because /0/1/2 don't get a chance to get created.

any thoughts to solve this?

Thanks,
Nan


val streamProperties = new Properties()
  streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
"my-first-streams-application1")
  streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG,
"important-test-client")
  streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
classOf[StringSerde].getName)
  streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
classOf[IntegerSerde].getName)

  val streamBuilder = new StreamsBuilder()
  val topic = "input"

  val inputStream = streamBuilder.stream[String, Integer](topic)

  val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes,
Array[Byte]]](topic + "_store")
    .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())

  val reducer = new Reducer[Integer](){
    override def apply(value1: Integer, value2: Integer): Integer = {
      value2
    }
  }

  //value is not important, only care key.
  val ktable = inputStream.filter(filter).groupByKey().reduce(reducer,
materialized)

  // make sure parent exist.
  def filter(key: String, value: Integer): Boolean = {
    println("===current store===, checking key " + key + " value: " + value
)
    store.all().forEachRemaining(x => println(x.key))
    val parent = key.trim().substring(0, key.lastIndexOf("/"))
    if(parent == "") {
      true
    } else {
      if (store.get(parent) == null) {
        println("not found parent" + parent)
        false
      } else {
        true
      }
    }
  }

  val topology = streamBuilder.build()
  val streams = new KafkaStreams(topology, streamProperties)
  streams.start()

  Thread.sleep(6000)
  val storeName = ktable.queryableStoreName()
  val store = streams.store(storeName,
QueryableStoreTypes.keyValueStore[String, Integer])


val senderProperties = new Properties
  senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer")
  senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
  senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[IntegerSerializer].getName)
  val producer = new KafkaProducer[String, Integer](senderProperties)


  for(j <- 1 until 10) {
    val path = for(i <- 0 until j) yield {
      "/" + i
    }
    producer.send(new ProducerRecord(topic, path.mkString(""), j))
  }

  Thread.sleep(3000)
  println("====show final store state====")
  store.all().forEachRemaining(x => println(x.key))

Thread.sleep(10000000)


output:
===current store===, checking key /0 value: 1
===current store===, checking key /0/1 value: 2
/0
===current store===, checking key /0/1/2/3 value: 4
/0/1
/0
not found parent/0/1/2
===current store===, checking key /0/1/2 value: 3
/0/1
/0
===current store===, checking key /0/1/2/3/4/5 value: 6
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4
===current store===, checking key /0/1/2/3/4 value: 5
/0/1
/0
/0/1/2
not found parent/0/1/2/3
===current store===, checking key /0/1/2/3/4/5/6 value: 7
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5
===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6/7
===current store===, checking key /0/1/2/3/4/5/6/7 value: 8
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6
====show final store state====
/0/1
/0
/0/1/2

Reply via email to