Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5964#discussion_r186700920
--- Diff:
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
---
@@ -85,7 +85,7 @@ public void onFailure(Throwable t) {
}
};
this.cluster = builder.getCluster();
- this.session = cluster.connect();
+ this.session =
cluster.connect(configuration.getString("keyspace", null));
--- End diff --
a cleaner approach would be to add a `protected Session
createSession(Cluster cluster)` method. The default implementation returns
`cluster.connect()`, and the pojo sink `cluster.connect(keyspace)`.
---