Hi Team,
Could you please answer the below:
When we are asynchronously calling a cassandra DB in Keyed ProcessStream
operator and before receiving response from DB , will the operator process
next event of same key ?
As per below code we understood, it will process the next event of same
key, is it accurate understanding or we are doing it wrong ?
Thank you for answering.
```
@Slf4j
@Builder
public class SCKeyedProcessFunction
extends KeyedProcessFunction<UUID, EventWithAssociations,
EventWithAssociations> {
private OutputTag<EventWithAssociations> existingEvents;
private transient ValueState<SCCache> cachestate;
@Override
public void processElement(
EventWithAssociations eventWithAssociations,
KeyedProcessFunction<UUID, EventWithAssociations,
EventWithAssociations>.Context
context,
Collector<EventWithAssociations> collector)
throws Exception {
log.info(
"Processing event in SCKeyedProcessFunction : {}",
eventWithAssociations.getTsvSefsEvent().getPayloadTxt());
// Check cache
try {
SCCache scCache = cachestate.value();
if (scCache == null) {
// Cache miss: Query Cassandra asynchronously
log.info(
"scCache is null for System_Id : {}",
EventWithAssociations.getEvent().getSystemId());
CompletableFuture<Cache> future =
queryCassandraAsync(eventWithAssociations);
future.thenAccept(
scEvent -> {
try {
if (scEvent != null) {
log.info("scEvent is received from DB : {}", scEvent);
// Update cache and process the event if necessary
cachestate.update(scEvent);
processEventWithCache(eventWithAssociations,
scEvent, collector, context);
} else {
log.info("This is first event hence collecting it :
{}", scEvent);
// If no result from Cassandra, collect the event
collector.collect(eventWithAssociations);
}
} catch (Exception e) {
throw new RuntimeException("Error updating cache or
processing event", e);
}
});
} else {
// Cache hit: Process event with cached value
processEventWithCache(eventWithAssociations, scCache,
collector, context);
}
} catch (Exception e) {
log.error("Error in SCKeyedProcessFunction asyncInvoke: {}",
e.getMessage(), e);
throw new RuntimeException(e);
}
}