Hey Gregory, 1) There is no special support for ResultSet in Flink, so you have to manually parse it and emit records of the type you want.
2) In your asyncInvoke you don't need to submit to the executorService since the client is already async and returns a future. You can use the code in run() directly. – Ufuk On Sun, Dec 24, 2017 at 1:41 PM, Gregory Melekh <gmel...@interwise.com> wrote: > Hello Dev team. > > I have StreamExecutionEnvironment job that consumes from kafka simple cql > select queries. I try to handle this queries asynchronically using following > code: > > public class GenericCassandraReader extends RichAsyncFunction { > private static final Logger logger = > LoggerFactory.getLogger(GenericCassandraReader.class); > private ExecutorService executorService; > > private final Properties props; > private Session client; > > public ExecutorService getExecutorService() { > return executorService; > } > > public GenericCassandraReader(Properties props, ExecutorService > executorService) { > super(); > this.props = props; > this.executorService = executorService; > } > > @Override > public void open(Configuration parameters) throws Exception { > client = Cluster.builder().addContactPoint(props.getProperty("cqlHost")) > .withPort(Integer.parseInt(props.getProperty("cqlPort"))).build() > .connect(props.getProperty("keyspace")); > > } > > @Override > public void close() throws Exception { > client.close(); > synchronized (GenericCassandraReader.class) { > try { > if (!getExecutorService().awaitTermination(1000, > TimeUnit.MILLISECONDS)) { > getExecutorService().shutdownNow(); > } > } catch (InterruptedException e) { > getExecutorService().shutdownNow(); > } > } > } > > @Override > public void asyncInvoke(final UserDefinedType input, final > AsyncCollector<ResultSet> asyncCollector) throws Exception { > getExecutorService().submit(new Runnable() { > @Override > public void run() { > ListenableFuture<ResultSet> resultSetFuture = > client.executeAsync(input.query); > > Futures.addCallback(resultSetFuture, new > FutureCallback<ResultSet>() { > > public void onSuccess(ResultSet resultSet) { > asyncCollector.collect(Collections.singleton(resultSet)); > } > > public void onFailure(Throwable t) { > asyncCollector.collect(t); > } > }); > } > }); > } > } > > each response of this code provides Cassandra ResultSet with different amount > of fields . > Any Ideas for handling Cassandra ResultSet in Flink or should I use another > technics to reach my goal ? > Thanks for any help in advance! >