Hi Team,

Could you please answer the below:

When we are asynchronously calling a cassandra DB in Keyed ProcessStream
operator and before receiving response from DB , will the operator process
next event of same key ?

As per below code we understood, it will process the next event of same
key, is it accurate understanding or we are doing it wrong ?

Thank you for answering.
```

@Slf4j
@Builder
public class SCKeyedProcessFunction
    extends KeyedProcessFunction<UUID, EventWithAssociations,
EventWithAssociations> {

  private OutputTag<EventWithAssociations> existingEvents;

  private transient ValueState<SCCache> cachestate;

  @Override
  public void processElement(
      EventWithAssociations eventWithAssociations,
      KeyedProcessFunction<UUID, EventWithAssociations,
EventWithAssociations>.Context
          context,
      Collector<EventWithAssociations> collector)
      throws Exception {
    log.info(
        "Processing event in SCKeyedProcessFunction : {}",
        eventWithAssociations.getTsvSefsEvent().getPayloadTxt());
    // Check cache
    try {
      SCCache scCache = cachestate.value();

      if (scCache == null) {
        // Cache miss: Query Cassandra asynchronously
        log.info(
            "scCache is null for System_Id : {}",
            EventWithAssociations.getEvent().getSystemId());
        CompletableFuture<Cache> future =
queryCassandraAsync(eventWithAssociations);

        future.thenAccept(
            scEvent -> {
              try {
                if (scEvent != null) {
                  log.info("scEvent is received from DB : {}", scEvent);
                  // Update cache and process the event if necessary
                  cachestate.update(scEvent);
                  processEventWithCache(eventWithAssociations,
scEvent, collector, context);
                } else {
                  log.info("This is first event hence collecting it :
{}", scEvent);
                  // If no result from Cassandra, collect the event
                  collector.collect(eventWithAssociations);
                }
              } catch (Exception e) {
                throw new RuntimeException("Error updating cache or
processing event", e);
              }
            });
      } else {
        // Cache hit: Process event with cached value
        processEventWithCache(eventWithAssociations, scCache,
collector, context);
      }
    } catch (Exception e) {
      log.error("Error in SCKeyedProcessFunction asyncInvoke: {}",
e.getMessage(), e);
      throw new RuntimeException(e);
    }
  }

Reply via email to