Hello Dev team. I have StreamExecutionEnvironment job that consumes from kafka simple cql select queries. I try to handle this queries asynchronically using following code:
public class GenericCassandraReader extends RichAsyncFunction { private static final Logger logger = LoggerFactory.getLogger(GenericCassandraReader.class); private ExecutorService executorService; private final Properties props; private Session client; public ExecutorService getExecutorService() { return executorService; } public GenericCassandraReader(Properties props, ExecutorService executorService) { super(); this.props = props; this.executorService = executorService; } @Override public void open(Configuration parameters) throws Exception { client = Cluster.builder().addContactPoint(props.getProperty("cqlHost")) .withPort(Integer.parseInt(props.getProperty("cqlPort"))).build() .connect(props.getProperty("keyspace")); } @Override public void close() throws Exception { client.close(); synchronized (GenericCassandraReader.class) { try { if (!getExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS)) { getExecutorService().shutdownNow(); } } catch (InterruptedException e) { getExecutorService().shutdownNow(); } } } @Override public void asyncInvoke(final UserDefinedType input, final AsyncCollector<ResultSet> asyncCollector) throws Exception { getExecutorService().submit(new Runnable() { @Override public void run() { ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(input.query); Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { public void onSuccess(ResultSet resultSet) { asyncCollector.collect(Collections.singleton(resultSet)); } public void onFailure(Throwable t) { asyncCollector.collect(t); } }); } }); } } each response of this code provides Cassandra ResultSet with different amount of fields . Any Ideas for handling Cassandra ResultSet in Flink or should I use another technics to reach my goal ? Thanks for any help in advance!