Hi Team, Could you please answer the below:
When we are asynchronously calling a cassandra DB in Keyed ProcessStream operator and before receiving response from DB , will the operator process next event of same key ? As per below code we understood, it will process the next event of same key, is it accurate understanding or we are doing it wrong ? Thank you for answering. ``` @Slf4j @Builder public class SCKeyedProcessFunction extends KeyedProcessFunction<UUID, EventWithAssociations, EventWithAssociations> { private OutputTag<EventWithAssociations> existingEvents; private transient ValueState<SCCache> cachestate; @Override public void processElement( EventWithAssociations eventWithAssociations, KeyedProcessFunction<UUID, EventWithAssociations, EventWithAssociations>.Context context, Collector<EventWithAssociations> collector) throws Exception { log.info( "Processing event in SCKeyedProcessFunction : {}", eventWithAssociations.getTsvSefsEvent().getPayloadTxt()); // Check cache try { SCCache scCache = cachestate.value(); if (scCache == null) { // Cache miss: Query Cassandra asynchronously log.info( "scCache is null for System_Id : {}", EventWithAssociations.getEvent().getSystemId()); CompletableFuture<Cache> future = queryCassandraAsync(eventWithAssociations); future.thenAccept( scEvent -> { try { if (scEvent != null) { log.info("scEvent is received from DB : {}", scEvent); // Update cache and process the event if necessary cachestate.update(scEvent); processEventWithCache(eventWithAssociations, scEvent, collector, context); } else { log.info("This is first event hence collecting it : {}", scEvent); // If no result from Cassandra, collect the event collector.collect(eventWithAssociations); } } catch (Exception e) { throw new RuntimeException("Error updating cache or processing event", e); } }); } else { // Cache hit: Process event with cached value processEventWithCache(eventWithAssociations, scCache, collector, context); } } catch (Exception e) { log.error("Error in SCKeyedProcessFunction asyncInvoke: {}", e.getMessage(), e); throw new RuntimeException(e); } }