Hello Dev team.
I have StreamExecutionEnvironment job that consumes from kafka simple cql
select queries. I try to handle this queries asynchronically using following
code:
public class GenericCassandraReader extends RichAsyncFunction {
private static final Logger logger =
LoggerFactory.getLogger(GenericCassandraReader.class);
private ExecutorService executorService;
private final Properties props;
private Session client;
public ExecutorService getExecutorService() {
return executorService;
}
public GenericCassandraReader(Properties props, ExecutorService
executorService) {
super();
this.props = props;
this.executorService = executorService;
}
@Override
public void open(Configuration parameters) throws Exception {
client = Cluster.builder().addContactPoint(props.getProperty("cqlHost"))
.withPort(Integer.parseInt(props.getProperty("cqlPort"))).build()
.connect(props.getProperty("keyspace"));
}
@Override
public void close() throws Exception {
client.close();
synchronized (GenericCassandraReader.class) {
try {
if (!getExecutorService().awaitTermination(1000,
TimeUnit.MILLISECONDS)) {
getExecutorService().shutdownNow();
}
} catch (InterruptedException e) {
getExecutorService().shutdownNow();
}
}
}
@Override
public void asyncInvoke(final UserDefinedType input, final
AsyncCollector<ResultSet> asyncCollector) throws Exception {
getExecutorService().submit(new Runnable() {
@Override
public void run() {
ListenableFuture<ResultSet> resultSetFuture =
client.executeAsync(input.query);
Futures.addCallback(resultSetFuture, new
FutureCallback<ResultSet>() {
public void onSuccess(ResultSet resultSet) {
asyncCollector.collect(Collections.singleton(resultSet));
}
public void onFailure(Throwable t) {
asyncCollector.collect(t);
}
});
}
});
}
}
each response of this code provides Cassandra ResultSet with different amount
of fields .
Any Ideas for handling Cassandra ResultSet in Flink or should I use another
technics to reach my goal ?
Thanks for any help in advance!