http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
new file mode 100644
index 0000000..a3d002e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
stores incoming records within a
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only 
commits them to cassandra
+ * if a checkpoint is completed.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraTupleWriteAheadSink<IN extends Tuple> extends 
GenericWriteAheadSink<IN> {
+       private static final long serialVersionUID = 1L;
+
+       protected transient Cluster cluster;
+       protected transient Session session;
+
+       private final String insertQuery;
+       private transient PreparedStatement preparedStatement;
+
+       private ClusterBuilder builder;
+
+       private transient Object[] fields;
+
+       protected CassandraTupleWriteAheadSink(String insertQuery, 
TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter 
committer) throws Exception {
+               super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
+               this.insertQuery = insertQuery;
+               this.builder = builder;
+               ClosureCleaner.clean(builder, true);
+       }
+
+       public void open() throws Exception {
+               super.open();
+               if (!getRuntimeContext().isCheckpointingEnabled()) {
+                       throw new IllegalStateException("The write-ahead log 
requires checkpointing to be enabled.");
+               }
+               cluster = builder.getCluster();
+               session = cluster.connect();
+               preparedStatement = session.prepare(insertQuery);
+
+               fields = new Object[((TupleSerializer<IN>) 
serializer).getArity()];
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               try {
+                       if (session != null) {
+                               session.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing session.", e);
+               }
+               try {
+                       if (cluster != null) {
+                               cluster.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing cluster.", e);
+               }
+       }
+
+       @Override
+       protected boolean sendValues(Iterable<IN> values, long timestamp) 
throws Exception {
+               final AtomicInteger updatesCount = new AtomicInteger(0);
+               final AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+               final AtomicReference<Throwable> exception = new 
AtomicReference<>();
+
+               FutureCallback<ResultSet> callback = new 
FutureCallback<ResultSet>() {
+                       @Override
+                       public void onSuccess(ResultSet resultSet) {
+                               updatesConfirmed.incrementAndGet();
+                               if (updatesCount.get() > 0) { // only set if 
all updates have been sent
+                                       if (updatesCount.get() == 
updatesConfirmed.get()) {
+                                               synchronized (updatesConfirmed) 
{
+                                                       
updatesConfirmed.notifyAll();
+                                               }
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void onFailure(Throwable throwable) {
+                               if (exception.compareAndSet(null, throwable)) {
+                                       LOG.error("Error while sending value.", 
throwable);
+                                       synchronized (updatesConfirmed) {
+                                               updatesConfirmed.notifyAll();
+                                       }
+                               }
+                       }
+               };
+
+               //set values for prepared statement
+               int updatesSent = 0;
+               for (IN value : values) {
+                       for (int x = 0; x < value.getArity(); x++) {
+                               fields[x] = value.getField(x);
+                       }
+                       //insert values and send to cassandra
+                       BoundStatement s = preparedStatement.bind(fields);
+                       s.setDefaultTimestamp(timestamp);
+                       ResultSetFuture result = session.executeAsync(s);
+                       updatesSent++;
+                       if (result != null) {
+                               //add callback to detect errors
+                               Futures.addCallback(result, callback);
+                       }
+               }
+               updatesCount.set(updatesSent);
+
+               synchronized (updatesConfirmed) {
+                       while (exception.get() == null && updatesSent != 
updatesConfirmed.get()) {
+                               updatesConfirmed.wait();
+                       }
+               }
+
+               if (exception.get() != null) {
+                       LOG.warn("Sending a value failed.", exception.get());
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
new file mode 100644
index 0000000..9fd3b4e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+
+import java.io.Serializable;
+
+/**
+ * This class is used to configure a {@link com.datastax.driver.core.Cluster} 
after deployment.
+ * The cluster represents the connection that will be established to Cassandra.
+ */
+public abstract class ClusterBuilder implements Serializable {
+
+       public Cluster getCluster() {
+               return buildCluster(Cluster.builder());
+       }
+
+       /**
+        * Configures the connection to Cassandra.
+        * The configuration is done by calling methods on the builder object
+        * and finalizing the configuration with build().
+        *
+        * @param builder connection builder
+        * @return configured connection
+        */
+       protected abstract Cluster buildCluster(Cluster.Builder builder);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
new file mode 100644
index 0000000..e66b8b3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Cassandra Input-/OutputFormats in 
the Batch API.
+ * 
+ * The example assumes that a table exists in a local cassandra database, 
according to the following query: 
+ * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, 
strings));
+ */
+public class BatchExample {
+       private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+       private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+       /*
+        *      table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+        */
+       public static void main(String[] args) throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+
+               ArrayList<Tuple2<Integer, String>> collection = new 
ArrayList<>(20);
+               for (int i = 0; i < 20; i++) {
+                       collection.add(new Tuple2<>(i, "string " + i));
+               }
+
+               DataSet<Tuple2<Integer, String>> dataSet = 
env.fromCollection(collection);
+
+               dataSet.output(new CassandraOutputFormat<Tuple2<Integer, 
String>>(INSERT_QUERY, new ClusterBuilder() {
+                       @Override
+                       protected Cluster buildCluster(Builder builder) {
+                               return 
builder.addContactPoints("127.0.0.1").build();
+                       }
+               }));
+
+               env.execute("Write");
+
+               DataSet<Tuple2<Integer, String>> inputDS = env
+                       .createInput(new CassandraInputFormat<Tuple2<Integer, 
String>>(SELECT_QUERY, new ClusterBuilder() {
+                               @Override
+                               protected Cluster buildCluster(Builder builder) 
{
+                                       return 
builder.addContactPoints("127.0.0.1").build();
+                               }
+                       }), TupleTypeInfo.of(new TypeHint<Tuple2<Integer, 
String>>() {
+                       }));
+
+               inputDS.print();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
new file mode 100644
index 0000000..2bb6fd1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+
+import org.apache.cassandra.service.CassandraDaemon;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.TestEnvironment;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Scanner;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, 
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CassandraConnectorITCase.class);
+       private static File tmpDir;
+
+       private static final boolean EMBEDDED = true;
+
+       private static EmbeddedCassandraService cassandra;
+
+       private static ClusterBuilder builder = new ClusterBuilder() {
+               @Override
+               protected Cluster buildCluster(Cluster.Builder builder) {
+                       return builder
+                               .addContactPoint("127.0.0.1")
+                               .withQueryOptions(new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+                               .withoutJMXReporting()
+                               .withoutMetrics().build();
+               }
+       };
+
+       private static Cluster cluster;
+       private static Session session;
+
+       private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE 
flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+       private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE 
flink;";
+       private static final String CREATE_TABLE_QUERY = "CREATE TABLE 
flink.test (id text PRIMARY KEY, counter int, batch_id int);";
+       private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
+       private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test 
(id, counter, batch_id) VALUES (?, ?, ?)";
+       private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
+
+       private static final ArrayList<Tuple3<String, Integer, Integer>> 
collection = new ArrayList<>(20);
+
+       static {
+               for (int i = 0; i < 20; i++) {
+                       collection.add(new 
Tuple3<>(UUID.randomUUID().toString(), i, 0));
+               }
+       }
+
+       private static class EmbeddedCassandraService {
+               CassandraDaemon cassandraDaemon;
+
+               public void start() throws IOException {
+                       this.cassandraDaemon = new CassandraDaemon();
+                       this.cassandraDaemon.init(null);
+                       this.cassandraDaemon.start();
+               }
+
+               public void stop() {
+                       this.cassandraDaemon.stop();
+               }
+       }
+
+       private static LocalFlinkMiniCluster flinkCluster;
+
+       // 
------------------------------------------------------------------------
+       //  Cluster Setup (Cassandra & Flink)
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void startCassandra() throws IOException {
+
+               // check if we should run this test, current Cassandra version 
requires Java >= 1.8
+               org.apache.flink.core.testutils.CommonTestUtils.assumeJava8();
+
+               // generate temporary files
+               tmpDir = CommonTestUtils.createTempDirectory();
+               ClassLoader classLoader = 
CassandraConnectorITCase.class.getClassLoader();
+               File file = new 
File(classLoader.getResource("cassandra.yaml").getFile());
+               File tmp = new File(tmpDir.getAbsolutePath() + File.separator + 
"cassandra.yaml");
+               
+               assertTrue(tmp.createNewFile());
+
+               try (
+                       BufferedWriter b = new BufferedWriter(new 
FileWriter(tmp));
+
+                       //copy cassandra.yaml; inject absolute paths into 
cassandra.yaml
+                       Scanner scanner = new Scanner(file);
+               ) {
+                       while (scanner.hasNextLine()) {
+                               String line = scanner.nextLine();
+                               line = line.replace("$PATH", "'" + 
tmp.getParentFile());
+                               b.write(line + "\n");
+                               b.flush();
+                       }
+               }
+
+
+               // Tell cassandra where the configuration files are.
+               // Use the test configuration file.
+               System.setProperty("cassandra.config", 
tmp.getAbsoluteFile().toURI().toString());
+
+               if (EMBEDDED) {
+                       cassandra = new EmbeddedCassandraService();
+                       cassandra.start();
+               }
+
+               try {
+                       Thread.sleep(1000 * 10);
+               } catch (InterruptedException e) { //give cassandra a few 
seconds to start up
+               }
+
+               cluster = builder.getCluster();
+               session = cluster.connect();
+
+               session.execute(CREATE_KEYSPACE_QUERY);
+               session.execute(CREATE_TABLE_QUERY);
+       }
+
+       @BeforeClass
+       public static void startFlink() throws Exception {
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
4);
+
+               flinkCluster = new LocalFlinkMiniCluster(config);
+               flinkCluster.start();
+       }
+
+       @AfterClass
+       public static void stopFlink() {
+               if (flinkCluster != null) {
+                       flinkCluster.stop();
+                       flinkCluster = null;
+               }
+       }
+
+       @AfterClass
+       public static void closeCassandra() {
+               if (session != null) {
+                       session.executeAsync(DROP_KEYSPACE_QUERY);
+                       session.close();
+               }
+
+               if (cluster != null) {
+                       cluster.close();
+               }
+
+               if (cassandra != null) {
+                       cassandra.stop();
+               }
+
+               if (tmpDir != null) {
+                       //noinspection ResultOfMethodCallIgnored
+                       tmpDir.delete();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Test preparation & cleanup
+       // 
------------------------------------------------------------------------
+
+       @Before
+       public void initializeExecutionEnvironment() {
+               TestStreamEnvironment.setAsContext(flinkCluster, 4);
+               new TestEnvironment(flinkCluster, 4, false).setAsContext();
+       }
+
+       @After
+       public void deleteSchema() throws Exception {
+               session.executeAsync(CLEAR_TABLE_QUERY);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Exactly-once Tests
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>> createSink() throws Exception {
+               return new CassandraTupleWriteAheadSink<>(
+                       INSERT_DATA_QUERY,
+                       TypeExtractor.getForObject(new Tuple3<>("", 0, 
0)).createSerializer(new ExecutionConfig()),
+                       builder,
+                       new CassandraCommitter(builder));
+       }
+
+       @Override
+       protected TupleTypeInfo<Tuple3<String, Integer, Integer>> 
createTypeInfo() {
+               return TupleTypeInfo.getBasicTupleTypeInfo(String.class, 
Integer.class, Integer.class);
+       }
+
+       @Override
+       protected Tuple3<String, Integer, Integer> generateValue(int counter, 
int checkpointID) {
+               return new Tuple3<>(UUID.randomUUID().toString(), counter, 
checkpointID);
+       }
+
+       @Override
+       protected void verifyResultsIdealCircumstances(
+               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
+               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
+
+               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               ArrayList<Integer> list = new ArrayList<>();
+               for (int x = 1; x <= 60; x++) {
+                       list.add(x);
+               }
+
+               for (Row s : result) {
+                       list.remove(new Integer(s.getInt("counter")));
+               }
+               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
+       }
+
+       @Override
+       protected void verifyResultsDataPersistenceUponMissedNotify(
+               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
+               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
+
+               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               ArrayList<Integer> list = new ArrayList<>();
+               for (int x = 1; x <= 60; x++) {
+                       list.add(x);
+               }
+
+               for (Row s : result) {
+                       list.remove(new Integer(s.getInt("counter")));
+               }
+               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
+       }
+
+       @Override
+       protected void verifyResultsDataDiscardingUponRestore(
+               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
+               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
+
+               ResultSet result = session.execute(SELECT_DATA_QUERY);
+               ArrayList<Integer> list = new ArrayList<>();
+               for (int x = 1; x <= 20; x++) {
+                       list.add(x);
+               }
+               for (int x = 41; x <= 60; x++) {
+                       list.add(x);
+               }
+
+               for (Row s : result) {
+                       list.remove(new Integer(s.getInt("counter")));
+               }
+               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
+       }
+
+       @Test
+       public void testCassandraCommitter() throws Exception {
+               CassandraCommitter cc1 = new CassandraCommitter(builder);
+               cc1.setJobId("job");
+               cc1.setOperatorId("operator");
+
+               CassandraCommitter cc2 = new CassandraCommitter(builder);
+               cc2.setJobId("job");
+               cc2.setOperatorId("operator");
+
+               CassandraCommitter cc3 = new CassandraCommitter(builder);
+               cc3.setJobId("job");
+               cc3.setOperatorId("operator1");
+
+               cc1.createResource();
+
+               cc1.open();
+               cc2.open();
+               cc3.open();
+
+               Assert.assertFalse(cc1.isCheckpointCommitted(0, 1));
+               Assert.assertFalse(cc2.isCheckpointCommitted(1, 1));
+               Assert.assertFalse(cc3.isCheckpointCommitted(0, 1));
+
+               cc1.commitCheckpoint(0, 1);
+               Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
+               //verify that other sub-tasks aren't affected
+               Assert.assertFalse(cc2.isCheckpointCommitted(1, 1));
+               //verify that other tasks aren't affected
+               Assert.assertFalse(cc3.isCheckpointCommitted(0, 1));
+
+               Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
+
+               cc1.close();
+               cc2.close();
+               cc3.close();
+
+               cc1 = new CassandraCommitter(builder);
+               cc1.setJobId("job");
+               cc1.setOperatorId("operator");
+
+               cc1.open();
+
+               //verify that checkpoint data is not destroyed within 
open/close and not reliant on internally cached data
+               Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
+               Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
+
+               cc1.close();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  At-least-once Tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testCassandraTupleAtLeastOnceSink() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+
+               DataStream<Tuple3<String, Integer, Integer>> source = 
env.fromCollection(collection);
+               source.addSink(new CassandraTupleSink<Tuple3<String, Integer, 
Integer>>(INSERT_DATA_QUERY, builder));
+
+               env.execute();
+
+               ResultSet rs = session.execute(SELECT_DATA_QUERY);
+               Assert.assertEquals(20, rs.all().size());
+       }
+
+       @Test
+       public void testCassandraPojoAtLeastOnceSink() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+
+               DataStreamSource<Pojo> source = env
+                       .addSource(new SourceFunction<Pojo>() {
+
+                               private boolean running = true;
+                               private volatile int cnt = 0;
+
+                               @Override
+                               public void run(SourceContext<Pojo> ctx) throws 
Exception {
+                                       while (running) {
+                                               ctx.collect(new 
Pojo(UUID.randomUUID().toString(), cnt, 0));
+                                               cnt++;
+                                               if (cnt == 20) {
+                                                       cancel();
+                                               }
+                                       }
+                               }
+
+                               @Override
+                               public void cancel() {
+                                       running = false;
+                               }
+                       });
+
+               source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
+
+               env.execute();
+
+               ResultSet rs = session.execute(SELECT_DATA_QUERY);
+               Assert.assertEquals(20, rs.all().size());
+       }
+
+       @Test
+       public void testCassandraBatchFormats() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+
+               DataSet<Tuple3<String, Integer, Integer>> dataSet = 
env.fromCollection(collection);
+               dataSet.output(new CassandraOutputFormat<Tuple3<String, 
Integer, Integer>>(INSERT_DATA_QUERY, builder));
+
+               env.execute("Write data");
+
+               DataSet<Tuple3<String, Integer, Integer>> inputDS = 
env.createInput(
+                       new CassandraInputFormat<Tuple3<String, Integer, 
Integer>>(SELECT_DATA_QUERY, builder),
+                       TypeInformation.of(new TypeHint<Tuple3<String, Integer, 
Integer>>(){}));
+
+
+               long count = inputDS.count();
+               Assert.assertEquals(count, 20L);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
new file mode 100644
index 0000000..847d1a0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class CassandraTupleWriteAheadSinkTest {
+
+       @Test(timeout=20000)
+       public void testAckLoopExitOnException() throws Exception {
+               final AtomicReference<Runnable> runnableFuture = new 
AtomicReference<>();
+
+               final ClusterBuilder clusterBuilder = new ClusterBuilder() {
+                       private static final long serialVersionUID = 
4624400760492936756L;
+
+                       @Override
+                       protected Cluster buildCluster(Cluster.Builder builder) 
{
+                               try {
+                                       BoundStatement boundStatement = 
mock(BoundStatement.class);
+                                       
when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement);
+
+                                       PreparedStatement preparedStatement = 
mock(PreparedStatement.class);
+                                       
when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement);
+
+                                       ResultSetFuture future = 
mock(ResultSetFuture.class);
+                                       when(future.get()).thenThrow(new 
RuntimeException("Expected exception."));
+
+                                       doAnswer(new Answer<Void>() {
+                                               @Override
+                                               public Void 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                                       synchronized 
(runnableFuture) {
+                                                               
runnableFuture.set((((Runnable) invocationOnMock.getArguments()[0])));
+                                                               
runnableFuture.notifyAll();
+                                                       }
+                                                       return null;
+                                               }
+                                       
}).when(future).addListener(any(Runnable.class), any(Executor.class));
+
+                                       Session session = mock(Session.class);
+                                       
when(session.prepare(anyString())).thenReturn(preparedStatement);
+                                       
when(session.executeAsync(any(BoundStatement.class))).thenReturn(future);
+
+                                       Cluster cluster = mock(Cluster.class);
+                                       
when(cluster.connect()).thenReturn(session);
+                                       return cluster;
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               };
+
+               // Our asynchronous executor thread
+               new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               synchronized (runnableFuture) {
+                                       while (runnableFuture.get() == null) {
+                                               try {
+                                                       runnableFuture.wait();
+                                               } catch (InterruptedException 
e) {
+                                                       // ignore interrupts
+                                               }
+                                       }
+                               }
+                               runnableFuture.get().run();
+                       }
+               }).start();
+
+               CheckpointCommitter cc = mock(CheckpointCommitter.class);
+               final CassandraTupleWriteAheadSink<Tuple0> sink = new 
CassandraTupleWriteAheadSink<>(
+                       "abc",
+                       TupleTypeInfo.of(Tuple0.class).createSerializer(new 
ExecutionConfig()),
+                       clusterBuilder,
+                       cc
+               );
+
+               OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new 
OneInputStreamOperatorTestHarness(sink);
+               
harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", 
true);
+
+               harness.setup();
+               sink.open();
+
+               // we should leave the loop and return false since we've seen 
an exception
+               assertFalse(sink.sendValues(Collections.singleton(new 
Tuple0()), 0L));
+
+               sink.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
new file mode 100644
index 0000000..9b331d6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+
+@Table(keyspace = "flink", name = "test")
+public class Pojo implements Serializable {
+
+       private static final long serialVersionUID = 1038054554690916991L;
+
+       @Column(name = "id")
+       private String id;
+       @Column(name = "counter")
+       private int counter;
+       @Column(name = "batch_id")
+       private int batch_id;
+
+       public Pojo(String id, int counter, int batch_id) {
+               this.id = id;
+               this.counter = counter;
+               this.batch_id = batch_id;
+       }
+
+       public String getId() {
+               return id;
+       }
+
+       public void setId(String id) {
+               this.id = id;
+       }
+
+       public int getCounter() {
+               return counter;
+       }
+
+       public void setCounter(int counter) {
+               this.counter = counter;
+       }
+
+       public int getBatch_id() {
+               return batch_id;
+       }
+
+       public void setBatch_id(int batch_id) {
+               this.batch_id = batch_id;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
new file mode 100644
index 0000000..e1bcea9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Pojo Cassandra Sink in the 
Streaming API.
+ * 
+ * Pojo's have to be annotated with datastax annotations to work with this 
sink.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following query:
+ * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY)
+ */
+public class CassandraPojoSinkExample {
+       private static final ArrayList<Message> messages = new ArrayList<>(20);
+
+       static {
+               for (long i = 0; i < 20; i++) {
+                       messages.add(new Message("cassandra-" + i));
+               }
+       }
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Message> source = env.fromCollection(messages);
+
+               CassandraSink.addSink(source)
+                       .setClusterBuilder(new ClusterBuilder() {
+                               @Override
+                               protected Cluster buildCluster(Builder builder) 
{
+                                       return 
builder.addContactPoint("127.0.0.1").build();
+                               }
+                       })
+                       .build();
+
+               env.execute("Cassandra Sink example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
new file mode 100644
index 0000000..c6345df
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the Tuple Cassandra Sink in the 
Streaming API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following query:
+ * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, 
element2 int)
+ */
+public class CassandraTupleSinkExample {
+       private static final String INSERT = "INSERT INTO test.writetuple 
(element1, element2) VALUES (?, ?)";
+       private static final ArrayList<Tuple2<String, Integer>> collection = 
new ArrayList<>(20);
+
+       static {
+               for (int i = 0; i < 20; i++) {
+                       collection.add(new Tuple2<>("cassandra-" + i, i));
+               }
+       }
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<String, Integer>> source = 
env.fromCollection(collection);
+
+               CassandraSink.addSink(source)
+                       .setQuery(INSERT)
+                       .setClusterBuilder(new ClusterBuilder() {
+                               @Override
+                               protected Cluster buildCluster(Builder builder) 
{
+                                       return 
builder.addContactPoint("127.0.0.1").build();
+                               }
+                       })
+                       .build();
+
+               env.execute("WriteTupleIntoCassandra");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
new file mode 100644
index 0000000..811c410
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.UUID;
+
+/**
+ * This is an example showing the to use the Cassandra Sink (with write-ahead 
log) in the Streaming API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following query:
+ * CREATE TABLE example.values (id text, count int, PRIMARY KEY(id));
+ * 
+ * Important things to note are that checkpointing is enabled, a StateBackend 
is set and the enableWriteAheadLog() call
+ * when creating the CassandraSink.
+ */
+public class CassandraTupleWriteAheadSinkExample {
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.enableCheckpointing(1000);
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
1000));
+               env.setStateBackend(new FsStateBackend("file:///" + 
System.getProperty("java.io.tmpdir") + "/flink/backend"));
+
+               CassandraSink<Tuple2<String, Integer>> sink = 
CassandraSink.addSink(env.addSource(new MySource()))
+                       .setQuery("INSERT INTO example.values (id, counter) 
values (?, ?);")
+                       .enableWriteAheadLog()
+                       .setClusterBuilder(new ClusterBuilder() {
+                               @Override
+                               public Cluster buildCluster(Cluster.Builder 
builder) {
+                                       return 
builder.addContactPoint("127.0.0.1").build();
+                               }
+                       })
+                       .build();
+
+               sink.name("Cassandra 
Sink").disableChaining().setParallelism(1).uid("hello");
+
+               env.execute();
+       }
+
+       public static class MySource implements SourceFunction<Tuple2<String, 
Integer>>, Checkpointed<Integer> {
+               private int counter = 0;
+               private boolean stop = false;
+
+               @Override
+               public void run(SourceContext<Tuple2<String, Integer>> ctx) 
throws Exception {
+                       while (!stop) {
+                               Thread.sleep(50);
+                               ctx.collect(new Tuple2<>("" + 
UUID.randomUUID(), 1));
+                               counter++;
+                               if (counter == 100) {
+                                       stop = true;
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       stop = true;
+               }
+
+               @Override
+               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return counter;
+               }
+
+               @Override
+               public void restoreState(Integer state) throws Exception {
+                       this.counter = state;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
new file mode 100644
index 0000000..7524d95
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+
+@Table(keyspace = "test", name = "message")
+public class Message implements Serializable {
+
+       private static final long serialVersionUID = 1123119384361005680L;
+
+       @Column(name = "body")
+       private String message;
+
+       public Message(String word) {
+               this.message = word;
+       }
+
+       public String getMessage() {
+               return message;
+       }
+
+       public void setMessage(String word) {
+               this.message = word;
+       }
+
+       public boolean equals(Object other) {
+               if (other instanceof Message) {
+                       Message that = (Message) other;
+                       return this.message.equals(that.message);
+               }
+               return false;
+       }
+
+       @Override
+       public int hashCode() {
+               return message.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml 
b/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
new file mode 100644
index 0000000..0594ea3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
@@ -0,0 +1,43 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+cluster_name: 'Test Cluster'
+commitlog_sync: 'periodic'
+commitlog_sync_period_in_ms: 10000
+commitlog_segment_size_in_mb: 16
+partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
+endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
+commitlog_directory: $PATH/commit'
+data_file_directories:
+    - $PATH/data'
+saved_caches_directory: $PATH/cache'
+listen_address: '127.0.0.1'
+seed_provider:
+    - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider'
+      parameters:
+          - seeds: '127.0.0.1'
+native_transport_port: 9042
+
+concurrent_reads: 8
+concurrent_writes: 8
+
+auto_bootstrap: false
+auto_snapshot: false
+
+start_rpc: false
+start_native_transport: true
+native_transport_max_threads: 8

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..a43d556
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target= System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/pom.xml 
b/flink-connectors/flink-connector-elasticsearch/pom.xml
new file mode 100644
index 0000000..0b78484
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-elasticsearch_2.10</artifactId>
+       <name>flink-connector-elasticsearch</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <elasticsearch.version>1.7.1</elasticsearch.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.elasticsearch</groupId>
+                       <artifactId>elasticsearch</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       
<rerunFailingTestsCount>3</rerunFailingTestsCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
new file mode 100644
index 0000000..ac14ade
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.collect.ImmutableList;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+
+/**
+ * Sink that emits its input elements to an Elasticsearch cluster.
+ *
+ * <p>
+ * When using the first constructor {@link #ElasticsearchSink(java.util.Map, 
IndexRequestBuilder)}
+ * the sink will create a local {@link Node} for communicating with the
+ * Elasticsearch cluster. When using the second constructor
+ * {@link #ElasticsearchSink(java.util.Map, java.util.List, 
IndexRequestBuilder)} a {@link TransportClient} will
+ * be used instead.
+ *
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will 
fail if no cluster
+ * can be connected to. With the {@code Node Client} the sink will block and 
wait for a cluster
+ * to come online.
+ *
+ * <p>
+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch 
when creating
+ * the {@link Node} or {@link TransportClient}. The config keys can be found 
in the Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this should be 
set to the name
+ * of the cluster that the sink should emit to.
+ *
+ * <p>
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * </ul>
+ *
+ * <p>
+ * You also have to provide an {@link IndexRequestBuilder}. This is used to 
create an
+ * {@link IndexRequest} from an element that needs to be added to 
Elasticsearch. See
+ * {@link 
org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an 
example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T> {
+
+       public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+       public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+       public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSink.class);
+
+       /**
+        * The user specified config map that we forward to Elasticsearch when 
we create the Client.
+        */
+       private final Map<String, String> userConfig;
+
+       /**
+        * The list of nodes that the TransportClient should connect to. This 
is null if we are using
+        * an embedded Node to get a Client.
+        */
+       private final List<TransportAddress> transportNodes;
+
+       /**
+        * The builder that is used to construct an {@link IndexRequest} from 
the incoming element.
+        */
+       private final IndexRequestBuilder<T> indexRequestBuilder;
+
+       /**
+        * The embedded Node that is used to communicate with the Elasticsearch 
cluster. This is null
+        * if we are using a TransportClient.
+        */
+       private transient Node node;
+
+       /**
+        * The Client that was either retrieved from a Node or is a 
TransportClient.
+        */
+       private transient Client client;
+
+       /**
+        * Bulk processor that was created using the client
+        */
+       private transient BulkProcessor bulkProcessor;
+
+       /**
+        * This is set from inside the BulkProcessor listener if there where 
failures in processing.
+        */
+       private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+       /**
+        * This is set from inside the BulkProcessor listener if a Throwable 
was thrown during processing.
+        */
+       private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
+
+       /**
+        * Creates a new ElasticsearchSink that connects to the cluster using 
an embedded Node.
+        *
+        * @param userConfig The map of user settings that are passed when 
constructing the Node and BulkProcessor
+        * @param indexRequestBuilder This is used to generate the IndexRequest 
from the incoming element
+        */
+       public ElasticsearchSink(Map<String, String> userConfig, 
IndexRequestBuilder<T> indexRequestBuilder) {
+               this.userConfig = userConfig;
+               this.indexRequestBuilder = indexRequestBuilder;
+               transportNodes = null;
+       }
+
+       /**
+        * Creates a new ElasticsearchSink that connects to the cluster using a 
TransportClient.
+        *
+        * @param userConfig The map of user settings that are passed when 
constructing the TransportClient and BulkProcessor
+        * @param transportNodes The Elasticsearch Nodes to which to connect 
using a {@code TransportClient}
+        * @param indexRequestBuilder This is used to generate the IndexRequest 
from the incoming element
+        *
+        */
+       public ElasticsearchSink(Map<String, String> userConfig, 
List<TransportAddress> transportNodes, IndexRequestBuilder<T> 
indexRequestBuilder) {
+               this.userConfig = userConfig;
+               this.indexRequestBuilder = indexRequestBuilder;
+               this.transportNodes = transportNodes;
+       }
+
+       /**
+        * Initializes the connection to Elasticsearch by either creating an 
embedded
+        * {@link org.elasticsearch.node.Node} and retrieving the
+        * {@link org.elasticsearch.client.Client} from it or by creating a
+        * {@link org.elasticsearch.client.transport.TransportClient}.
+        */
+       @Override
+       public void open(Configuration configuration) {
+               if (transportNodes == null) {
+                       // Make sure that we disable http access to our 
embedded node
+                       Settings settings =
+                                       ImmutableSettings.settingsBuilder()
+                                                       .put(userConfig)
+                                                       .put("http.enabled", 
false)
+                                                       .build();
+
+                       node =
+                                       nodeBuilder()
+                                                       .settings(settings)
+                                                       .client(true)
+                                                       .data(false)
+                                                       .node();
+
+                       client = node.client();
+
+                       if (LOG.isInfoEnabled()) {
+                               LOG.info("Created Elasticsearch Client {} from 
embedded Node", client);
+                       }
+
+               } else {
+                       Settings settings = ImmutableSettings.settingsBuilder()
+                                       .put(userConfig)
+                                       .build();
+
+                       TransportClient transportClient = new 
TransportClient(settings);
+                       for (TransportAddress transport: transportNodes) {
+                               transportClient.addTransportAddress(transport);
+                       }
+
+                       // verify that we actually are connected to a cluster
+                       ImmutableList<DiscoveryNode> nodes = 
transportClient.connectedNodes();
+                       if (nodes.isEmpty()) {
+                               throw new RuntimeException("Client is not 
connected to any Elasticsearch nodes!");
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Connected to nodes: " + 
nodes.toString());
+                               }
+                       }
+
+                       client = transportClient;
+
+                       if (LOG.isInfoEnabled()) {
+                               LOG.info("Created Elasticsearch TransportClient 
{}", client);
+                       }
+               }
+
+               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(
+                               client,
+                               new BulkProcessor.Listener() {
+                                       @Override
+                                       public void beforeBulk(long executionId,
+                                                       BulkRequest request) {
+
+                                       }
+
+                                       @Override
+                                       public void afterBulk(long executionId,
+                                                       BulkRequest request,
+                                                       BulkResponse response) {
+                                               if (response.hasFailures()) {
+                                                       for (BulkItemResponse 
itemResp : response.getItems()) {
+                                                               if 
(itemResp.isFailed()) {
+                                                                       
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
+                                                                       
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+                                                               }
+                                                       }
+                                                       hasFailure.set(true);
+                                               }
+                                       }
+
+                                       @Override
+                                       public void afterBulk(long executionId,
+                                                       BulkRequest request,
+                                                       Throwable failure) {
+                                               LOG.error(failure.getMessage());
+                                               
failureThrowable.compareAndSet(null, failure);
+                                               hasFailure.set(true);
+                                       }
+                               });
+
+               // This makes flush() blocking
+               bulkProcessorBuilder.setConcurrentRequests(0);
+
+               ParameterTool params = ParameterTool.fromMap(userConfig);
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+                       
bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+               }
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+                       bulkProcessorBuilder.setBulkSize(new 
ByteSizeValue(params.getInt(
+                                       CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), 
ByteSizeUnit.MB));
+               }
+
+               if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+                       
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
+               }
+
+               bulkProcessor = bulkProcessorBuilder.build();
+       }
+
+       @Override
+       public void invoke(T element) {
+               IndexRequest indexRequest = 
indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Emitting IndexRequest: {}", indexRequest);
+               }
+
+               bulkProcessor.add(indexRequest);
+       }
+
+       @Override
+       public void close() {
+               if (bulkProcessor != null) {
+                       bulkProcessor.close();
+                       bulkProcessor = null;
+               }
+
+               if (client != null) {
+                       client.close();
+               }
+
+               if (node != null) {
+                       node.close();
+               }
+
+               if (hasFailure.get()) {
+                       Throwable cause = failureThrowable.get();
+                       if (cause != null) {
+                               throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
+                       } else {
+                               throw new RuntimeException("An error occured in 
ElasticsearchSink.");
+
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
new file mode 100644
index 0000000..04ae40a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.elasticsearch.action.index.IndexRequest;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates an {@link IndexRequest} from an element in a Stream.
+ *
+ * <p>
+ * This is used by {@link 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink}
+ * to prepare elements for sending them to Elasticsearch. See
+ * <a 
href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html";>Index
 API</a>
+ * for information about how to format data for adding it to an Elasticsearch 
index.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     private static class MyIndexRequestBuilder implements 
IndexRequestBuilder<String> {
+ *
+ *         public IndexRequest createIndexRequest(String element, 
RuntimeContext ctx) {
+ *             Map<String, Object> json = new HashMap<>();
+ *             json.put("data", element);
+ *
+ *             return Requests.indexRequest()
+ *                 .index("my-index")
+ *                 .type("my-type")
+ *                 .source(json);
+ *         }
+ *     }
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code 
IndexRequestBuilder}
+ */
+public interface IndexRequestBuilder<T> extends Function, Serializable {
+
+       /**
+        * Creates an {@link org.elasticsearch.action.index.IndexRequest} from 
an element.
+        *
+        * @param element The element that needs to be turned in to an {@code 
IndexRequest}
+        * @param ctx The Flink {@link RuntimeContext} of the {@link 
ElasticsearchSink}
+        *
+        * @return The constructed {@code IndexRequest}
+        */
+       IndexRequest createIndexRequest(T element, RuntimeContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..33a2e47
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.LocalTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.node.Node;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase 
{
+
+       private static final int NUM_ELEMENTS = 20;
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testNodeClient() throws Exception{
+
+               File dataDir = tempFolder.newFolder();
+
+               Node node = nodeBuilder()
+                               .settings(ImmutableSettings.settingsBuilder()
+                                               .put("http.enabled", false)
+                                               .put("path.data", 
dataDir.getAbsolutePath()))
+                               // set a custom cluster name to verify that 
user config works correctly
+                               .clusterName("my-node-client-cluster")
+                               .local(true)
+                               .node();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> config = Maps.newHashMap();
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+               config.put("cluster.name", "my-node-client-cluster");
+
+               // connect to our local node
+               config.put("node.local", "true");
+
+               source.addSink(new ElasticsearchSink<>(config, new 
TestIndexRequestBuilder()));
+
+               env.execute("Elasticsearch Node Client Test");
+
+
+               // verify the results
+               Client client = node.client();
+               for (int i = 0; i < NUM_ELEMENTS; i++) {
+                       GetResponse response = client.get(new 
GetRequest("my-index",
+                                       "my-type",
+                                       Integer.toString(i))).actionGet();
+                       Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
+               }
+
+               node.close();
+       }
+
+       @Test
+       public void testTransportClient() throws Exception {
+
+               File dataDir = tempFolder.newFolder();
+
+               Node node = nodeBuilder()
+                               .settings(ImmutableSettings.settingsBuilder()
+                                               .put("http.enabled", false)
+                                               .put("path.data", 
dataDir.getAbsolutePath()))
+                                               // set a custom cluster name to 
verify that user config works correctly
+                               .clusterName("my-node-client-cluster")
+                               .local(true)
+                               .node();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> config = Maps.newHashMap();
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+               config.put("cluster.name", "my-node-client-cluster");
+
+               // connect to our local node
+               config.put("node.local", "true");
+
+               List<TransportAddress> transports = Lists.newArrayList();
+               transports.add(new LocalTransportAddress("1"));
+
+               source.addSink(new ElasticsearchSink<>(config, transports, new 
TestIndexRequestBuilder()));
+
+               env.execute("Elasticsearch TransportClient Test");
+
+
+               // verify the results
+               Client client = node.client();
+               for (int i = 0; i < NUM_ELEMENTS; i++) {
+                       GetResponse response = client.get(new 
GetRequest("my-index",
+                                       "my-type",
+                                       Integer.toString(i))).actionGet();
+                       Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
+               }
+
+               node.close();
+       }
+
+       @Test(expected = JobExecutionException.class)
+       public void testTransportClientFails() throws Exception{
+               // this checks whether the TransportClient fails early when 
there is no cluster to
+               // connect to. We don't hava such as test for the Node Client 
version since that
+               // one will block and wait for a cluster to come online
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
+
+               Map<String, String> config = Maps.newHashMap();
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+               config.put("cluster.name", "my-node-client-cluster");
+
+               // connect to our local node
+               config.put("node.local", "true");
+
+               List<TransportAddress> transports = Lists.newArrayList();
+               transports.add(new LocalTransportAddress("1"));
+
+               source.addSink(new ElasticsearchSink<>(config, transports, new 
TestIndexRequestBuilder()));
+
+               env.execute("Elasticsearch Node Client Test");
+       }
+
+       private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
+               private static final long serialVersionUID = 1L;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
+                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                               ctx.collect(Tuple2.of(i, "message #" + i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       private static class TestIndexRequestBuilder implements 
IndexRequestBuilder<Tuple2<Integer, String>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public IndexRequest createIndexRequest(Tuple2<Integer, String> 
element, RuntimeContext ctx) {
+                       Map<String, Object> json = new HashMap<>();
+                       json.put("data", element.f1);
+
+                       return Requests.indexRequest()
+                                       .index("my-index")
+                                       .type("my-type")
+                                       .id(element.f0.toString())
+                                       .source(json);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
new file mode 100644
index 0000000..136ae77
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you 
must ensure that
+ * you have a cluster named "elasticsearch" running or change the cluster name 
in the config map.
+ */
+public class ElasticsearchExample {
+
+       public static void main(String[] args) throws Exception {
+               
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStreamSource<String> source = env.addSource(new 
SourceFunction<String>() {
+                       private static final long serialVersionUID = 1L;
+
+                       private volatile boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<String> ctx) throws 
Exception {
+                               for (int i = 0; i < 20 && running; i++) {
+                                       ctx.collect("message #" + i);
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               Map<String, String> config = new HashMap<>();
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
+
+               source.addSink(new ElasticsearchSink<>(config, new 
IndexRequestBuilder<String>() {
+                       @Override
+                       public IndexRequest createIndexRequest(String element, 
RuntimeContext ctx) {
+                               Map<String, Object> json = new HashMap<>();
+                               json.put("data", element);
+
+                               return Requests.indexRequest()
+                                               .index("my-index")
+                                               .type("my-type")
+                                               .source(json);
+                       }
+               }));
+
+
+               env.execute("Elasticsearch Example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..dc20726
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
 
b/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

Reply via email to