Hi there,
I have a flink streaming app where my source is Kafka and a custom sink to
Cassandra(I can't use standard C* sink that comes with flink as I have
customized auth to C*). I'm currently have the following:
messageStream
.rebalance()
.map( s-> {
return mapper.readValue(s, JsonNode.class);)
.filter(//filter some messages)
.map(
(MapFunction<JsonNode, String>) message -> {
getDbSession.execute("QUERY_TO_EXEC")
})
private static Session getDbSession() {
if(dbSession == null && store!=null) {
dbSession = getSession();
}
return dbSession;
}
1. Is this the right way to add a custom sink? As you can see, I have
dbSession as a class variable here and I'm storing its state.
2. This setup works fine in a standalone flink (java -jar MyJar.jar).
When I run using flink with YARN on EMR I get a NPE at the session
which is kind of weird.
Thanks