Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3748#discussion_r147456818
--- Diff:
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
---
@@ -375,6 +381,34 @@ protected void sanityCheck() {
}
/**
+ * Builder for a {@link CassandraRowSink}.
+ */
+ public static class CassandraRowSinkBuilder extends
CassandraSinkBuilder<Row> {
+ public CassandraRowSinkBuilder(DataStream<Row> input,
TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) {
+ super(input, typeInfo, serializer);
+ }
+
+ @Override
+ protected void sanityCheck() {
+ super.sanityCheck();
+ if (query == null || query.length() == 0) {
+ throw new IllegalArgumentException("Query must
not be null or empty.");
+ }
+ }
+
+ @Override
+ protected CassandraSink<Row> createSink() throws Exception {
+ return new CassandraSink<>(input.addSink(new
CassandraRowSink(query, builder)).name("Cassandra Sink"));
+
+ }
+
+ @Override
+ protected CassandraSink<Row> createWriteAheadSink() throws
Exception {
+ throw new IllegalArgumentException("Exactly-once
guarantees can only be provided for tuple types.");
--- End diff --
I had a look at `CassandraTupleWriteAheadSink`. It would be straightforward
to copy and adapt it for `Row`.
---