Hello Dev team.

I have StreamExecutionEnvironment job that consumes from kafka simple cql 
select queries. I try to handle this queries asynchronically using following 
code:

public class GenericCassandraReader extends RichAsyncFunction {
private static final Logger logger = 
LoggerFactory.getLogger(GenericCassandraReader.class);
private ExecutorService executorService;

private final Properties props;
private Session client;

public ExecutorService getExecutorService() {
    return executorService;
}

public GenericCassandraReader(Properties props, ExecutorService 
executorService) {
    super();
    this.props = props;
    this.executorService = executorService;
}

@Override
public void open(Configuration parameters) throws Exception {
    client = Cluster.builder().addContactPoint(props.getProperty("cqlHost"))
            .withPort(Integer.parseInt(props.getProperty("cqlPort"))).build()
            .connect(props.getProperty("keyspace"));

}

@Override
public void close() throws Exception {
    client.close();
    synchronized (GenericCassandraReader.class) {
        try {
            if (!getExecutorService().awaitTermination(1000, 
TimeUnit.MILLISECONDS)) {
                getExecutorService().shutdownNow();
            }
        } catch (InterruptedException e) {
            getExecutorService().shutdownNow();
        }
    }
}

@Override
public void asyncInvoke(final UserDefinedType input, final 
AsyncCollector<ResultSet> asyncCollector) throws Exception {
    getExecutorService().submit(new Runnable() {
        @Override
        public void run() {
            ListenableFuture<ResultSet> resultSetFuture = 
client.executeAsync(input.query);

            Futures.addCallback(resultSetFuture, new 
FutureCallback<ResultSet>() {

                public void onSuccess(ResultSet resultSet) {
                    asyncCollector.collect(Collections.singleton(resultSet));
                }

                public void onFailure(Throwable t) {
                    asyncCollector.collect(t);
                }
            });
        }
    });
}
}

each response of this code provides Cassandra ResultSet with different amount 
of fields .
Any Ideas for handling Cassandra ResultSet in Flink or should I use another 
technics to reach my goal ?
Thanks for any help in advance!

Reply via email to