Repository: flink
Updated Branches:
  refs/heads/master 0df8e0797 -> 1809cad6d


[FLINK-6225] [cassandra] Add a CassandraAppendTableSink.

This closes #3748.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1809cad6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1809cad6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1809cad6

Branch: refs/heads/master
Commit: 1809cad6d05522d6185a69ca14ddc275d5ebbbf1
Parents: 0df8e07
Author: Jing Fan <[email protected]>
Authored: Thu Apr 20 17:54:36 2017 -0700
Committer: Fabian Hueske <[email protected]>
Committed: Wed Nov 1 14:53:09 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  44 ++++-
 .../flink-connector-cassandra/pom.xml           |   6 +
 .../cassandra/CassandraAppendTableSink.java     |  88 ++++++++++
 .../connectors/cassandra/CassandraRowSink.java  |  42 +++++
 .../cassandra/CassandraRowWriteAheadSink.java   | 162 +++++++++++++++++++
 .../connectors/cassandra/CassandraSink.java     |  40 ++++-
 .../cassandra/CassandraConnectorITCase.java     |  75 ++++++++-
 .../java/typeutils/runtime/RowSerializer.java   |   7 +
 tools/maven/suppressions.xml                    |   2 +-
 9 files changed, 454 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 43542f3..dfa7954 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -492,7 +492,8 @@ The following table lists the `TableSink`s which are 
provided with Flink.
 
 | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | 
**Description**
 | `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files.
-| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes tables to a JDBC 
database.
+| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC 
table.
+| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | 
Writes a Table to a Cassandra table. 
 | `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 
0.8 sink with JSON encoding.
 | `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 
0.9 sink with JSON encoding.
 
@@ -583,6 +584,47 @@ Similar to using <code>JDBCOutputFormat</code>, you have 
to explicitly specify t
 
 {% top %}
 
+### CassandraAppendTableSink
+
+The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink 
only supports append-only streaming tables. It cannot be used to emit a `Table` 
that is continuously updated. See the [documentation on Table to Stream 
conversions](./streaming.html#table-to-stream-conversion) for details. 
+
+The `CassandraAppendTableSink` inserts all rows at least once into the 
Cassandra table if checkpointing is enabled. However, you can specify the query 
as upsert query.
+
+To use the `CassandraAppendTableSink`, you have to add the Cassandra connector 
dependency (<code>flink-connector-cassandra</code>) to your project. The 
example below shows how to use the `CassandraAppendTableSink`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+ClusterBuilder builder = ... // configure Cassandra cluster connection
+
+CassandraAppendTableSink sink = new CassandraAppendTableSink(
+  builder, 
+  // the query must match the schema of the table
+  INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
+
+Table table = ...
+table.writeToSink(sink);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val builder: ClusterBuilder = ... // configure Cassandra cluster connection
+
+val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
+  builder, 
+  // the query must match the schema of the table
+  INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
+
+val table: Table = ???
+table.writeToSink(sink)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 Define a TableSource
 --------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml 
b/flink-connectors/flink-connector-cassandra/pom.xml
index c97b43f..3c1d3e1 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -197,5 +197,11 @@ under the License.
                                </exclusion>
                        </exclusions>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
        </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
new file mode 100644
index 0000000..395ff9a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * An {@link AppendStreamTableSink} to write an append stream Table to a 
Cassandra table.
+ */
+public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
+
+       private final ClusterBuilder builder;
+       private final String cql;
+       private String[] fieldNames;
+       private TypeInformation[] fieldTypes;
+       private final Properties properties;
+
+       public CassandraAppendTableSink(ClusterBuilder builder, String cql) {
+               this.builder = Preconditions.checkNotNull(builder, 
"ClusterBuilder must not be null.");
+               this.cql = Preconditions.checkNotNull(cql, "CQL query must not 
be null.");
+               this.properties = new Properties();
+       }
+
+       public CassandraAppendTableSink(ClusterBuilder builder, String cql, 
Properties properties) {
+               this.builder = Preconditions.checkNotNull(builder, 
"ClusterBuilder must not be null.");
+               this.cql = Preconditions.checkNotNull(cql, "CQL query must not 
be null.");
+               this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
+       }
+
+       @Override
+       public TypeInformation<Row> getOutputType() {
+               return new RowTypeInfo(fieldTypes);
+       }
+
+       @Override
+       public String[] getFieldNames() {
+               return this.fieldNames;
+       }
+
+       @Override
+       public TypeInformation<?>[] getFieldTypes() {
+               return this.fieldTypes;
+       }
+
+       @Override
+       public CassandraAppendTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               CassandraAppendTableSink cassandraTableSink = new 
CassandraAppendTableSink(this.builder, this.cql, this.properties);
+               cassandraTableSink.fieldNames = 
Preconditions.checkNotNull(fieldNames, "Field names must not be null.");
+               cassandraTableSink.fieldTypes = 
Preconditions.checkNotNull(fieldTypes, "Field types must not be null.");
+               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
+                       "Number of provided field names and types does not 
match.");
+               return cassandraTableSink;
+       }
+
+       @Override
+       public void emitDataStream(DataStream<Row> dataStream) {
+               try {
+                       CassandraSink.addSink(dataStream)
+                               .setClusterBuilder(this.builder)
+                               .setQuery(this.cql)
+                               .build();
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
new file mode 100644
index 0000000..fbbeb96
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.types.Row;
+
+/**
+ * A SinkFunction to write Row records into a Cassandra table.
+ */
+public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
+
+       private final int rowArity;
+
+       public CassandraRowSink(int rowArity, String insertQuery, 
ClusterBuilder builder) {
+               super(insertQuery, builder);
+               this.rowArity = rowArity;
+       }
+
+       @Override
+       protected Object[] extract(Row record) {
+               Object[] al = new Object[rowArity];
+               for (int i = 0; i < rowArity; i++) {
+                       al[i] = record.getField(i);
+               }
+               return al;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
new file mode 100644
index 0000000..6b3d418
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.types.Row;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements into a Cassandra table. This sink stores 
incoming records within a
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only 
commits them to Cassandra
+ * if a checkpoint is completed.
+ *
+ */
+public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {
+       private static final long serialVersionUID = 1L;
+
+       protected transient Cluster cluster;
+       protected transient Session session;
+
+       private final String insertQuery;
+       private transient PreparedStatement preparedStatement;
+
+       private ClusterBuilder builder;
+
+       private int arity;
+       private transient Object[] fields;
+
+       protected CassandraRowWriteAheadSink(String insertQuery, 
TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter 
committer) throws Exception {
+               super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
+               this.insertQuery = insertQuery;
+               this.builder = builder;
+               ClosureCleaner.clean(builder, true);
+       }
+
+       public void open() throws Exception {
+               super.open();
+               if (!getRuntimeContext().isCheckpointingEnabled()) {
+                       throw new IllegalStateException("The write-ahead log 
requires checkpointing to be enabled.");
+               }
+               cluster = builder.getCluster();
+               session = cluster.connect();
+               preparedStatement = session.prepare(insertQuery);
+
+               arity = ((RowSerializer) serializer).getArity();
+               fields = new Object[arity];
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               try {
+                       if (session != null) {
+                               session.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing session.", e);
+               }
+               try {
+                       if (cluster != null) {
+                               cluster.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing cluster.", e);
+               }
+       }
+
+       @Override
+       protected boolean sendValues(Iterable<Row> values, long checkpointId, 
long timestamp) throws Exception {
+               final AtomicInteger updatesCount = new AtomicInteger(0);
+               final AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+               final AtomicReference<Throwable> exception = new 
AtomicReference<>();
+
+               FutureCallback<ResultSet> callback = new 
FutureCallback<ResultSet>() {
+                       @Override
+                       public void onSuccess(ResultSet resultSet) {
+                               updatesConfirmed.incrementAndGet();
+                               if (updatesCount.get() > 0) { // only set if 
all updates have been sent
+                                       if (updatesCount.get() == 
updatesConfirmed.get()) {
+                                               synchronized (updatesConfirmed) 
{
+                                                       
updatesConfirmed.notifyAll();
+                                               }
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void onFailure(Throwable throwable) {
+                               if (exception.compareAndSet(null, throwable)) {
+                                       LOG.error("Error while sending value.", 
throwable);
+                                       synchronized (updatesConfirmed) {
+                                               updatesConfirmed.notifyAll();
+                                       }
+                               }
+                       }
+               };
+
+               //set values for prepared statement
+               int updatesSent = 0;
+               for (Row value : values) {
+                       for (int x = 0; x < arity; x++) {
+                               fields[x] = value.getField(x);
+                       }
+                       //insert values and send to cassandra
+                       BoundStatement s = preparedStatement.bind(fields);
+                       s.setDefaultTimestamp(timestamp);
+                       ResultSetFuture result = session.executeAsync(s);
+                       updatesSent++;
+                       if (result != null) {
+                               //add callback to detect errors
+                               Futures.addCallback(result, callback);
+                       }
+               }
+               updatesCount.set(updatesSent);
+
+               synchronized (updatesConfirmed) {
+                       while (exception.get() == null && updatesSent != 
updatesConfirmed.get()) {
+                               updatesConfirmed.wait();
+                       }
+               }
+
+               if (exception.get() != null) {
+                       LOG.warn("Sending a value failed.", exception.get());
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 29c4b21..3543378 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -32,6 +33,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
 
 import com.datastax.driver.core.Cluster;
 
@@ -205,12 +207,16 @@ public class CassandraSink<IN> {
         * @param <IN>  input type
         * @return CassandraSinkBuilder, to further configure the sink
         */
-       public static <IN, T extends Tuple> CassandraSinkBuilder<IN> 
addSink(DataStream<IN> input) {
+       public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> 
input) {
                TypeInformation<IN> typeInfo = input.getType();
                if (typeInfo instanceof TupleTypeInfo) {
-                       DataStream<T> tupleInput = (DataStream<T>) input;
+                       DataStream<Tuple> tupleInput = (DataStream<Tuple>) 
input;
                        return (CassandraSinkBuilder<IN>) new 
CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), 
tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
                }
+               if (typeInfo instanceof RowTypeInfo) {
+                       DataStream<Row> rowInput = (DataStream<Row>) input;
+                       return (CassandraSinkBuilder<IN>) new 
CassandraRowSinkBuilder(rowInput, rowInput.getType(), 
rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig()));
+               }
                if (typeInfo instanceof PojoTypeInfo) {
                        return new CassandraPojoSinkBuilder<>(input, 
input.getType(), 
input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
                }
@@ -391,6 +397,36 @@ public class CassandraSink<IN> {
        }
 
        /**
+        * Builder for a {@link CassandraRowSink}.
+        */
+       public static class CassandraRowSinkBuilder extends 
CassandraSinkBuilder<Row> {
+               public CassandraRowSinkBuilder(DataStream<Row> input, 
TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) {
+                       super(input, typeInfo, serializer);
+               }
+
+               @Override
+               protected void sanityCheck() {
+                       super.sanityCheck();
+                       if (query == null || query.length() == 0) {
+                               throw new IllegalArgumentException("Query must 
not be null or empty.");
+                       }
+               }
+
+               @Override
+               protected CassandraSink<Row> createSink() throws Exception {
+                       return new CassandraSink<>(input.addSink(new 
CassandraRowSink(typeInfo.getArity(), query, builder)).name("Cassandra Sink"));
+
+               }
+
+               @Override
+               protected CassandraSink<Row> createWriteAheadSink() throws 
Exception {
+                       return committer == null
+                               ? new 
CassandraSink<>(input.transform("Cassandra Sink", null, new 
CassandraRowWriteAheadSink(query, serializer, builder, new 
CassandraCommitter(builder))))
+                               : new 
CassandraSink<>(input.transform("Cassandra Sink", null, new 
CassandraRowWriteAheadSink(query, serializer, builder, committer)));
+               }
+       }
+
+       /**
         * Builder for a {@link CassandraPojoSink}.
         * @param <IN>
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f52a42c..f1b598f 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -35,15 +35,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.types.Row;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.junit.AfterClass;
@@ -58,6 +60,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -83,11 +86,15 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
        private static EmbeddedCassandraService cassandra;
 
+       private static final String HOST = "127.0.0.1";
+
+       private static final int PORT = 9042;
+
        private static ClusterBuilder builder = new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                        return builder
-                               .addContactPoint("127.0.0.1")
+                               .addContactPointsWithPorts(new 
InetSocketAddress(HOST, PORT))
                                .withQueryOptions(new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
                                .withoutJMXReporting()
                                .withoutMetrics().build();
@@ -108,10 +115,16 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        private int tableID;
 
        private static final ArrayList<Tuple3<String, Integer, Integer>> 
collection = new ArrayList<>(20);
+       private static final ArrayList<Row> rowCollection = new ArrayList<>(20);
+
+       private static final String[] FIELD_NAMES = {"id", "counter", 
"batch_id"};
+       private static final TypeInformation[] FIELD_TYPES = {
+               BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO};
 
        static {
                for (int i = 0; i < 20; i++) {
                        collection.add(new 
Tuple3<>(UUID.randomUUID().toString(), i, 0));
+                       rowCollection.add(Row.of(UUID.randomUUID().toString(), 
i, 0));
                }
        }
 
@@ -245,7 +258,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                        list.add(x);
                }
 
-               for (Row s : result) {
+               for (com.datastax.driver.core.Row s : result) {
                        list.remove(new Integer(s.getInt("counter")));
                }
                Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
@@ -260,7 +273,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                        list.add(x);
                }
 
-               for (Row s : result) {
+               for (com.datastax.driver.core.Row s : result) {
                        list.remove(new Integer(s.getInt("counter")));
                }
                Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
@@ -278,7 +291,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                        list.add(x);
                }
 
-               for (Row s : result) {
+               for (com.datastax.driver.core.Row s : result) {
                        list.remove(new Integer(s.getInt("counter")));
                }
                Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
@@ -300,7 +313,8 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
                ArrayList<Integer> actual = new ArrayList<>();
                ResultSet result = 
session.execute(injectTableName(SELECT_DATA_QUERY));
-               for (Row s : result) {
+
+               for (com.datastax.driver.core.Row s : result) {
                        actual.add(s.getInt("counter"));
                }
 
@@ -380,6 +394,22 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        }
 
        @Test
+       public void testCassandraRowAtLeastOnceSink() throws Exception {
+               CassandraRowSink sink = new 
CassandraRowSink(FIELD_TYPES.length, injectTableName(INSERT_DATA_QUERY), 
builder);
+
+               sink.open(new Configuration());
+
+               for (Row value : rowCollection) {
+                       sink.send(value);
+               }
+
+               sink.close();
+
+               ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
+               Assert.assertEquals(20, rs.all().size());
+       }
+
+       @Test
        public void testCassandraPojoAtLeastOnceSink() throws Exception {
                session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test"));
 
@@ -398,6 +428,35 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        }
 
        @Test
+       public void testCassandraTableSink() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+               StreamTableEnvironment tEnv = 
StreamTableEnvironment.getTableEnvironment(env);
+
+               DataStreamSource<Row> source = 
env.fromCollection(rowCollection);
+
+               tEnv.registerDataStreamInternal("testFlinkTable", source);
+
+               tEnv.sql("select * from testFlinkTable").writeToSink(
+                       new CassandraAppendTableSink(builder, 
injectTableName(INSERT_DATA_QUERY)));
+
+               env.execute();
+               ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
+
+               // validate that all input was correctly written to Cassandra
+               List<Row> input = new ArrayList<>(rowCollection);
+               List<com.datastax.driver.core.Row> output = rs.all();
+               for (com.datastax.driver.core.Row o : output) {
+                       Row cmp = new Row(3);
+                       cmp.setField(0, o.getString(0));
+                       cmp.setField(1, o.getInt(2));
+                       cmp.setField(2, o.getInt(1));
+                       Assert.assertTrue("Row " + cmp + " was written to 
Cassandra but not in input.", input.remove(cmp));
+               }
+               Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
+       }
+
+       @Test
        public void testCassandraBatchFormats() throws Exception {
                OutputFormat<Tuple3<String, Integer, Integer>> sink = new 
CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
                sink.configure(new Configuration());
@@ -465,10 +524,10 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                sink.close();
 
                ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
-               List<Row> rows = rs.all();
+               List<com.datastax.driver.core.Row> rows = rs.all();
                Assert.assertEquals(scalaTupleCollection.size(), rows.size());
 
-               for (Row row : rows) {
+               for (com.datastax.driver.core.Row row : rows) {
                        scalaTupleCollection.remove(new 
scala.Tuple3<>(row.getString("id"), row.getInt("counter"), 
row.getInt("batch_id")));
                }
                Assert.assertEquals(0, scalaTupleCollection.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index bd08b04..7f9cc21 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -50,11 +50,14 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
 
        private final TypeSerializer<Object>[] fieldSerializers;
 
+       private final int arity;
+
        private transient boolean[] nullMask;
 
        @SuppressWarnings("unchecked")
        public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
                this.fieldSerializers = (TypeSerializer<Object>[]) 
checkNotNull(fieldSerializers);
+               this.arity = fieldSerializers.length;
                this.nullMask = new boolean[fieldSerializers.length];
        }
 
@@ -135,6 +138,10 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
                return -1;
        }
 
+       public int getArity() {
+               return arity;
+       }
+
        @Override
        public void serialize(Row record, DataOutputView target) throws 
IOException {
                int len = fieldSerializers.length;

http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index d897137..5d5c455 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -30,7 +30,7 @@ under the License.
 
                <!-- Cassandra connectors have to use guava directly -->
                <suppress
-                       
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraTupleWriteAheadSink.java"
+                       
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
                        checks="IllegalImport"/>
                <!-- Kinesis producer has to use guava directly -->
                <suppress

Reply via email to