Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5964#discussion_r186673807
--- Diff:
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
---
@@ -50,18 +51,31 @@
* @param clazz Class instance
*/
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
- this(clazz, builder, null);
+ this(clazz, builder, null, null);
}
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder,
@Nullable MapperOptions options) {
+ this(clazz, builder, options, null);
+ }
+
+ public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder,
String keyspace) {
+ this(clazz, builder, null, keyspace);
+ }
+
+ public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder,
@Nullable MapperOptions options, String keyspace) {
super(builder);
this.clazz = clazz;
this.options = options;
+ this.keyspace = keyspace;
}
@Override
public void open(Configuration configuration) {
+ if (keyspace != null && !keyspace.isEmpty()) {
+ configuration.setString("keyspace", keyspace);
--- End diff --
this has no effect as the configuration is never used anywhere, and thus
can be removed.
---