Sergey Menshikov created KAFKA-9598:
---------------------------------------

             Summary: RocksDB exception when grouping dynamically appearing 
topics into a KTable 
                 Key: KAFKA-9598
                 URL: https://issues.apache.org/jira/browse/KAFKA-9598
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.4.0, 2.2.0
            Reporter: Sergey Menshikov
         Attachments: exception-details.txt

A streams application, consumer a number of topics via a whitelisted regex. The 
topics appear dynamically, generated from dynamically appearing MongoDB 
collections by debezium MongoDB source driver.

The development is running on debezium docker images (Debezium 0.9 and Debezium 
1.0 -> Kafka 2.2.0 and 2.4.0), single instance of Kafka, Connect and the 
streams consumer app.

As the MongoDB driver provides only deltas of the changes, to collect full 
record for each key, the code creates KTable which is then transformed into a 
KStream for further joining with other KTables and Global KTables.

The following piece of code results in the exception when a new topic is added:

 
{code:java}
Pattern tResultPattern =
 
Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");
KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern, 
Consumed.with(stringSerde, jsonSerde));
 KTable<String, JsonNode> tResultsTempTable = 
tResultsTempStream.groupByKey(Grouped.with(stringSerde,jsonSerde))
 .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue)); // mergeNodes 
is a Json traverse/merger procedure
KStream<String, JsonNode> tResults =
 tResultsTempTable.toStream();
 
{code}
kconsumer_1 | Exception in thread "split-reader-client3-StreamThread-1" 
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
KSTREAM-REDUCE-STATE-STORE-0000000032 at location 
/tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032

...

kconsumer_1 | Caused by: org.rocksdb.RocksDBException: lock : 
/tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032/LOCK: No 
locks available

Kstore 10_0 contains tr[0-9a-fA-F]\{32} records, I checked.

more details about exception are in the attached file.

The exception is no longer present when I use an intermediate topic instead:

 
{code:java}
Pattern tResultPattern =
 
Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");{code}
 

 
{code:java}
KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern, 
Consumed.with(stringSerde, jsonSerde));
 
tResultsTempStream.transform(trTransformer::new).to(config.getProperty("tr_intermediate_topic_name"),Produced.with(stringSerde,
 jsonSerde)); // trTransformer adds topic name into value Json, in previous 
snippet it was done in the pipeline after grouping/streaming
KStream<String, JsonNode> tResultsTempStream2 = 
builder.stream(config.getProperty("tr_intermediate_topic_name"), 
Consumed.with(stringSerde, jsonSerde));
 KTable<String, JsonNode> tResultsTempTable = 
tResultsTempStream2.groupByKey(Grouped.with(stringSerde,jsonSerde))
 .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue));
KStream<String, JsonNode> tResults =
 tResultsTempTable.toStream();
{code}
 

 

If making KTable from multiple whitelisted topics is something that is outside 
of scope of Kafka Streams, perhaps it would make sense to mention it in the 
docs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to