http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
new file mode 100644
index 0000000..37a83d1
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.Fixed16;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroSplittableInputFormatTest {
+       
+       private File testFile;
+       
+       final static String TEST_NAME = "Alyssa";
+       
+       final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+       final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+       
+       final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+       final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+       
+       final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+       
+       final static String TEST_MAP_KEY1 = "KEY 1";
+       final static long TEST_MAP_VALUE1 = 8546456L;
+       final static String TEST_MAP_KEY2 = "KEY 2";
+       final static long TEST_MAP_VALUE2 = 17554L;
+
+       final static Integer TEST_NUM = new Integer(239);
+       final static String TEST_STREET = "Baker Street";
+       final static String TEST_CITY = "London";
+       final static String TEST_STATE = "London";
+       final static String TEST_ZIP = "NW1 6XE";
+       
+       final static int NUM_RECORDS = 5000;
+
+       @Before
+       public void createFiles() throws IOException {
+               testFile = File.createTempFile("AvroSplittableInputFormatTest", 
null);
+               
+               ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
+               stringArray.add(TEST_ARRAY_STRING_1);
+               stringArray.add(TEST_ARRAY_STRING_2);
+               
+               ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+               booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+               booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+               
+               HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
+               longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+               longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+               
+               Address addr = new Address();
+               addr.setNum(new Integer(TEST_NUM));
+               addr.setStreet(TEST_STREET);
+               addr.setCity(TEST_CITY);
+               addr.setState(TEST_STATE);
+               addr.setZip(TEST_ZIP);
+               
+               
+               User user1 = new User();
+               user1.setName(TEST_NAME);
+               user1.setFavoriteNumber(256);
+               user1.setTypeDoubleTest(123.45d);
+               user1.setTypeBoolTest(true);
+               user1.setTypeArrayString(stringArray);
+               user1.setTypeArrayBoolean(booleanArray);
+               user1.setTypeEnum(TEST_ENUM_COLOR);
+               user1.setTypeMap(longMap);
+               user1.setTypeNested(addr);
+               
+               // Construct via builder
+               User user2 = User.newBuilder()
+                            .setName(TEST_NAME)
+                            .setFavoriteColor("blue")
+                            .setFavoriteNumber(null)
+                            .setTypeBoolTest(false)
+                            .setTypeDoubleTest(1.337d)
+                            .setTypeNullTest(null)
+                            .setTypeLongTest(1337L)
+                            .setTypeArrayString(new ArrayList<CharSequence>())
+                            .setTypeArrayBoolean(new ArrayList<Boolean>())
+                            .setTypeNullableArray(null)
+                            .setTypeEnum(Colors.RED)
+                            .setTypeMap(new HashMap<CharSequence, Long>())
+                                        .setTypeFixed(new Fixed16())
+                                        .setTypeUnion(123L)
+                               .setTypeNested(
+                                               
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
+                                                               
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
+                                                               .build())
+
+                            .build();
+               DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
+               DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
+               dataFileWriter.create(user1.getSchema(), testFile);
+               dataFileWriter.append(user1);
+               dataFileWriter.append(user2);
+
+               Random rnd = new Random(1337);
+               for(int i = 0; i < NUM_RECORDS -2 ; i++) {
+                       User user = new User();
+                       user.setName(TEST_NAME + rnd.nextInt());
+                       user.setFavoriteNumber(rnd.nextInt());
+                       user.setTypeDoubleTest(rnd.nextDouble());
+                       user.setTypeBoolTest(true);
+                       user.setTypeArrayString(stringArray);
+                       user.setTypeArrayBoolean(booleanArray);
+                       user.setTypeEnum(TEST_ENUM_COLOR);
+                       user.setTypeMap(longMap);
+                       Address address = new Address();
+                       address.setNum(new Integer(TEST_NUM));
+                       address.setStreet(TEST_STREET);
+                       address.setCity(TEST_CITY);
+                       address.setState(TEST_STATE);
+                       address.setZip(TEST_ZIP);
+                       user.setTypeNested(address);
+
+                       dataFileWriter.append(user);
+               }
+               dataFileWriter.close();
+       }
+       
+       @Test
+       public void testSplittedIF() throws IOException {
+               Configuration parameters = new Configuration();
+               
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+               format.configure(parameters);
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+               int elements = 0;
+               int elementsPerSplit[] = new int[4];
+               for(int i = 0; i < splits.length; i++) {
+                       format.open(splits[i]);
+                       while(!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
+       @Test
+       public void testAvroRecoveryWithFailureAtStart() throws Exception {
+               final int recordsUntilCheckpoint = 132;
+
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.configure(parameters);
+
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+
+               int elements = 0;
+               int elementsPerSplit[] = new int[4];
+               for(int i = 0; i < splits.length; i++) {
+                       format.reopen(splits[i], format.getCurrentState());
+                       while(!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+
+                               if(format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
+
+                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
+                                       Tuple2<Long, Long> state = 
format.getCurrentState();
+
+                                       // this is to make sure that nothing 
stays from the previous format
+                                       // (as it is going to be in the normal 
case)
+                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+                                       format.reopen(splits[i], state);
+                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+                               }
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
+       @Test
+       public void testAvroRecovery() throws Exception {
+               final int recordsUntilCheckpoint = 132;
+
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.configure(parameters);
+
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+
+               int elements = 0;
+               int elementsPerSplit[] = new int[4];
+               for(int i = 0; i < splits.length; i++) {
+                       format.open(splits[i]);
+                       while(!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+
+                               if(format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
+
+                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
+                                       Tuple2<Long, Long> state = 
format.getCurrentState();
+
+                                       // this is to make sure that nothing 
stays from the previous format
+                                       // (as it is going to be in the normal 
case)
+                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+                                       format.reopen(splits[i], state);
+                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+                               }
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
+       /*
+       This test is gave the reference values for the test of Flink's IF.
+
+       This dependency needs to be added
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-mapred</artifactId>
+            <version>1.7.6</version>
+        </dependency>
+
+       @Test
+       public void testHadoop() throws Exception {
+               JobConf jf = new JobConf();
+               FileInputFormat.addInputPath(jf, new 
org.apache.hadoop.fs.Path(testFile.toURI()));
+               
jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY,
 false);
+               org.apache.avro.mapred.AvroInputFormat<User> format = new 
org.apache.avro.mapred.AvroInputFormat<User>();
+               InputSplit[] sp = format.getSplits(jf, 4);
+               int elementsPerSplit[] = new int[4];
+               int cnt = 0;
+               int i = 0;
+               for(InputSplit s:sp) {
+                       RecordReader<AvroWrapper<User>, NullWritable> r = 
format.getRecordReader(s, jf, new HadoopDummyReporter());
+                       AvroWrapper<User> k = r.createKey();
+                       NullWritable v = r.createValue();
+
+                       while(r.next(k,v)) {
+                               cnt++;
+                               elementsPerSplit[i]++;
+                       }
+                       i++;
+               }
+               System.out.println("Status "+Arrays.toString(elementsPerSplit));
+       } **/
+       
+       @After
+       public void deleteFiles() {
+               testFile.delete();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
new file mode 100644
index 0000000..5a21691
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro.example;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class AvroTypeExample {
+       
+       
+       public static void main(String[] args) throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<User> users = env.createInput(new 
UserGeneratingInputFormat());
+
+               users
+                       .map(new NumberExtractingMapper())
+                       .groupBy(1)
+                       .reduceGroup(new ConcatenatingReducer())
+                       .print();
+       }
+       
+       
+       public static final class NumberExtractingMapper implements 
MapFunction<User, Tuple2<User, Integer>> {
+               
+               @Override
+               public Tuple2<User, Integer> map(User user) {
+                       return new Tuple2<User, Integer>(user, 
user.getFavoriteNumber());
+               }
+       }
+       
+       
+       public static final class ConcatenatingReducer implements 
GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
+
+               @Override
+               public void reduce(Iterable<Tuple2<User, Integer>> values, 
Collector<Tuple2<Integer, String>> out) throws Exception {
+                       int number = 0;
+                       StringBuilder colors = new StringBuilder();
+                       
+                       for (Tuple2<User, Integer> u : values) {
+                               number = u.f1;
+                               colors.append(u.f0.getFavoriteColor()).append(" 
- ");
+                       }
+                       
+                       colors.setLength(colors.length() - 3);
+                       out.collect(new Tuple2<Integer, String>(number, 
colors.toString()));
+               }
+       }
+       
+       
+       public static final class UserGeneratingInputFormat extends 
GenericInputFormat<User> {
+
+               private static final long serialVersionUID = 1L;
+               
+               private static final int NUM = 100;
+               
+               private final Random rnd = new Random(32498562304986L);
+               
+               private static final String[] NAMES = { "Peter", "Bob", 
"Liddy", "Alexander", "Stan" };
+               
+               private static final String[] COLORS = { "mauve", "crimson", 
"copper", "sky", "grass" };
+               
+               private int count;
+               
+
+               @Override
+               public boolean reachedEnd() throws IOException {
+                       return count >= NUM;
+               }
+
+               @Override
+               public User nextRecord(User reuse) throws IOException {
+                       count++;
+                       
+                       User u = new User();
+                       u.setName(NAMES[rnd.nextInt(NAMES.length)]);
+                       u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
+                       u.setFavoriteNumber(rnd.nextInt(87));
+                       return u;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
new file mode 100644
index 0000000..4608f96
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.example;  
+@SuppressWarnings("all")
[email protected]
+public class User extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public java.lang.Integer favorite_number;
+  @Deprecated public java.lang.CharSequence favorite_color;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use {@link #newBuilder()}. 
+   */
+  public User() {}
+
+  /**
+   * All-args constructor.
+   */
+  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, 
java.lang.CharSequence favorite_color) {
+    this.name = name;
+    this.favorite_number = favorite_number;
+    this.favorite_color = favorite_color;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return favorite_number;
+    case 2: return favorite_color;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: favorite_number = (java.lang.Integer)value$; break;
+    case 2: favorite_color = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_number' field.
+   */
+  public java.lang.Integer getFavoriteNumber() {
+    return favorite_number;
+  }
+
+  /**
+   * Sets the value of the 'favorite_number' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteNumber(java.lang.Integer value) {
+    this.favorite_number = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_color' field.
+   */
+  public java.lang.CharSequence getFavoriteColor() {
+    return favorite_color;
+  }
+
+  /**
+   * Sets the value of the 'favorite_color' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteColor(java.lang.CharSequence value) {
+    this.favorite_color = value;
+  }
+
+  /** Creates a new User RecordBuilder */
+  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() 
{
+    return new org.apache.flink.api.io.avro.example.User.Builder();
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing Builder */
+  public static org.apache.flink.api.io.avro.example.User.Builder 
newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
+    return new org.apache.flink.api.io.avro.example.User.Builder(other);
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing User instance */
+  public static org.apache.flink.api.io.avro.example.User.Builder 
newBuilder(org.apache.flink.api.io.avro.example.User other) {
+    return new org.apache.flink.api.io.avro.example.User.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for User instances.
+   */
+  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<User>
+    implements org.apache.avro.data.RecordBuilder<User> {
+
+    private java.lang.CharSequence name;
+    private java.lang.Integer favorite_number;
+    private java.lang.CharSequence favorite_color;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+    
+    /** Creates a Builder by copying an existing User instance */
+    private Builder(org.apache.flink.api.io.avro.example.User other) {
+            super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.flink.api.io.avro.example.User.Builder 
setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.flink.api.io.avro.example.User.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_number' field */
+    public java.lang.Integer getFavoriteNumber() {
+      return favorite_number;
+    }
+    
+    /** Sets the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.example.User.Builder 
setFavoriteNumber(java.lang.Integer value) {
+      validate(fields()[1], value);
+      this.favorite_number = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_number' field has been set */
+    public boolean hasFavoriteNumber() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.example.User.Builder 
clearFavoriteNumber() {
+      favorite_number = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_color' field */
+    public java.lang.CharSequence getFavoriteColor() {
+      return favorite_color;
+    }
+    
+    /** Sets the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.example.User.Builder 
setFavoriteColor(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.favorite_color = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_color' field has been set */
+    public boolean hasFavoriteColor() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.example.User.Builder 
clearFavoriteColor() {
+      favorite_color = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    @Override
+    public User build() {
+      try {
+        User record = new User();
+        record.name = fieldSetFlags()[0] ? this.name : 
(java.lang.CharSequence) defaultValue(fields()[0]);
+        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : 
(java.lang.Integer) defaultValue(fields()[1]);
+        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : 
(java.lang.CharSequence) defaultValue(fields()[2]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
new file mode 100644
index 0000000..e245026
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AvroInputFormatTypeExtractionTest {
+
+       @Test
+       public void testTypeExtraction() {
+               try {
+                       InputFormat<MyAvroType, ?> format = new 
AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), 
MyAvroType.class);
+
+                       TypeInformation<?> typeInfoDirect = 
TypeExtractor.getInputFormatTypes(format);
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       DataSet<MyAvroType> input = env.createInput(format);
+                       TypeInformation<?> typeInfoDataSet = input.getType();
+
+
+                       Assert.assertTrue(typeInfoDirect instanceof 
PojoTypeInfo);
+                       Assert.assertTrue(typeInfoDataSet instanceof 
PojoTypeInfo);
+
+                       Assert.assertEquals(MyAvroType.class, 
typeInfoDirect.getTypeClass());
+                       Assert.assertEquals(MyAvroType.class, 
typeInfoDataSet.getTypeClass());
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+
+       public static final class MyAvroType {
+
+               public String theString;
+
+               public MyAvroType recursive;
+
+               private double aDouble;
+
+               public double getaDouble() {
+                       return aDouble;
+               }
+
+               public void setaDouble(double aDouble) {
+                       this.aDouble = aDouble;
+               }
+
+               public void setTheString(String theString) {
+                       this.theString = theString;
+               }
+
+               public String getTheString() {
+                       return theString;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
new file mode 100644
index 0000000..4d6c6b7
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+/**
+ * Tests for {@link AvroOutputFormat}
+ */
+public class AvroOutputFormatTest {
+
+       @Test
+       public void testSetCodec() throws Exception {
+               // given
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+
+               // when
+               try {
+                       outputFormat.setCodec(Codec.SNAPPY);
+               } catch (Exception ex) {
+                       // then
+                       fail("unexpected exception");
+               }
+       }
+
+       @Test
+       public void testSetCodecError() throws Exception {
+               // given
+               boolean error = false;
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+
+               // when
+               try {
+                       outputFormat.setCodec(null);
+               } catch (Exception ex) {
+                       error = true;
+               }
+
+               // then
+               assertTrue(error);
+       }
+
+       @Test
+       public void testSerialization() throws Exception {
+
+               serializeAndDeserialize(null, null);
+               serializeAndDeserialize(null, User.SCHEMA$);
+               for (final Codec codec : Codec.values()) {
+                       serializeAndDeserialize(codec, null);
+                       serializeAndDeserialize(codec, User.SCHEMA$);
+               }
+       }
+
+       private void serializeAndDeserialize(final Codec codec, final Schema 
schema) throws IOException, ClassNotFoundException {
+               // given
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+               if (codec != null) {
+                       outputFormat.setCodec(codec);
+               }
+               if (schema != null) {
+                       outputFormat.setSchema(schema);
+               }
+
+               final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+               // when
+               try (final ObjectOutputStream oos = new 
ObjectOutputStream(bos)) {
+                       oos.writeObject(outputFormat);
+               }
+               try (final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
+                       // then
+                       Object o = ois.readObject();
+                       assertTrue(o instanceof AvroOutputFormat);
+                       final AvroOutputFormat<User> restored = 
(AvroOutputFormat<User>) o;
+                       final Codec restoredCodec = (Codec) 
Whitebox.getInternalState(restored, "codec");
+                       final Schema restoredSchema = (Schema) 
Whitebox.getInternalState(restored, "userDefinedSchema");
+
+                       assertTrue(codec != null ? restoredCodec == codec : 
restoredCodec == null);
+                       assertTrue(schema != null ? 
restoredSchema.equals(schema) : restoredSchema == null);
+               }
+       }
+
+       @Test
+       public void testCompression() throws Exception {
+               // given
+               final Path outputPath = new 
Path(File.createTempFile("avro-output-file","avro").getAbsolutePath());
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(outputPath,User.class);
+               outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+               final Path compressedOutputPath = new 
Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath());
+               final AvroOutputFormat<User> compressedOutputFormat = new 
AvroOutputFormat<>(compressedOutputPath,User.class);
+               
compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+               compressedOutputFormat.setCodec(Codec.SNAPPY);
+
+               // when
+               output(outputFormat);
+               output(compressedOutputFormat);
+
+               // then
+               assertTrue(fileSize(outputPath) > 
fileSize(compressedOutputPath));
+
+               // cleanup
+               Files.delete(Paths.get(outputPath.getPath()));
+               Files.delete(Paths.get(compressedOutputPath.getPath()));
+       }
+
+       private long fileSize(Path path) throws IOException {
+               return Files.size(Paths.get(path.getPath()));
+       }
+
+       private void output(final AvroOutputFormat<User> outputFormat) throws 
IOException {
+               outputFormat.configure(new Configuration());
+               outputFormat.open(1,1);
+               for (int i = 0; i < 100; i++) {
+                       outputFormat.writeRecord(new User("testUser",1,"blue"));
+               }
+               outputFormat.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc 
b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
new file mode 100644
index 0000000..02c11af
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
@@ -0,0 +1,35 @@
+[
+{"namespace": "org.apache.flink.api.io.avro.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+     {"name": "num", "type": "int"},
+     {"name": "street", "type": "string"},
+     {"name": "city", "type": "string"},
+     {"name": "state", "type": "string"},
+     {"name": "zip", "type": "string"}
+  ]
+},
+{"namespace": "org.apache.flink.api.io.avro.generated",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "type_long_test", "type": ["long", "null"]},
+     {"name": "type_double_test", "type": "double"},
+     {"name": "type_null_test", "type": ["null"]},
+     {"name": "type_bool_test", "type": ["boolean"]},
+     {"name": "type_array_string", "type" : {"type" : "array", "items" : 
"string"}},  
+     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : 
"boolean"}}, 
+     {"name": "type_nullable_array", "type": ["null", {"type":"array", 
"items":"string"}], "default":null},
+     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", 
"symbols" : ["RED", "GREEN", "BLUE"]}},
+     {"name": "type_map", "type": {"type": "map", "values": "long"}},
+     {"name": "type_fixed",
+                 "size": 16,
+                 "type": ["null", {"name": "Fixed16", "size": 16, "type": 
"fixed"}] },
+     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
+     {"name": "type_nested", "type": ["null", "Address"]}
+ ]
+}]

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties 
b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/logback-test.xml 
b/flink-connectors/flink-avro/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/testdata.avro 
b/flink-connectors/flink-avro/src/test/resources/testdata.avro
new file mode 100644
index 0000000..45308b9
Binary files /dev/null and 
b/flink-connectors/flink-avro/src/test/resources/testdata.avro differ

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml 
b/flink-connectors/flink-connector-cassandra/pom.xml
new file mode 100644
index 0000000..4ea790a
--- /dev/null
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-cassandra_2.10</artifactId>
+       <name>flink-connector-cassandra</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <cassandra.version>2.2.5</cassandra.version>
+               <driver.version>3.0.0</driver.version>
+       </properties>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <reuseForks>true</reuseForks>
+                                       <forkCount>1</forkCount>
+                                       <argLine>-Xms256m -Xmx2800m 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>2.4.1</version>
+                               <executions>
+                                       <!-- Run shade goal on package phase -->
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration 
combine.self="override">
+                                                       
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>com.datastax.cassandra:cassandra-driver-core</include>
+                                                                       
<include>com.datastax.cassandra:cassandra-driver-mapping</include>
+                                                                       
<include>com.google.guava:guava</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>com.google</pattern>
+                                                                       
<shadedPattern>org.apache.flink.cassandra.shaded.com.google</shadedPattern>
+                                                                       
<excludes>
+                                                                               
<exclude>com.google.protobuf.**</exclude>
+                                                                               
<exclude>com.google.inject.**</exclude>
+                                                                       
</excludes>
+                                                               </relocation>
+                                                       </relocations>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>com.datastax.cassandra</groupId>
+                       <artifactId>cassandra-driver-core</artifactId>
+                       <version>${driver.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       
<artifactId>log4j-over-slf4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>ch.qos.logback</groupId>
+                                       <artifactId>logback-classic</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>com.datastax.cassandra</groupId>
+                       <artifactId>cassandra-driver-mapping</artifactId>
+                       <version>${driver.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       
<artifactId>log4j-over-slf4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>ch.qos.logback</groupId>
+                                       <artifactId>logback-classic</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.cassandra</groupId>
+                       <artifactId>cassandra-all</artifactId>
+                       <version>${cassandra.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       
<artifactId>log4j-over-slf4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>ch.qos.logback</groupId>
+                                       <artifactId>logback-classic</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
new file mode 100644
index 0000000..849e023
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.base.Strings;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}.
+ *
+ * @param <OUT> type of Tuple
+ */
+public class CassandraInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT, InputSplit> implements NonParallelInput {
+       private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormat.class);
+
+       private final String query;
+       private final ClusterBuilder builder;
+
+       private transient Cluster cluster;
+       private transient Session session;
+       private transient ResultSet resultSet;
+
+       public CassandraInputFormat(String query, ClusterBuilder builder) {
+               Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+               Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
+               this.query = query;
+               this.builder = builder;
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+               this.cluster = builder.getCluster();
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+               return cachedStatistics;
+       }
+
+       /**
+        * Opens a Session and executes the query.
+        *
+        * @param ignored
+        * @throws IOException
+        */
+       @Override
+       public void open(InputSplit ignored) throws IOException {
+               this.session = cluster.connect();
+               this.resultSet = session.execute(query);
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return resultSet.isExhausted();
+       }
+
+       @Override
+       public OUT nextRecord(OUT reuse) throws IOException {
+               final Row item = resultSet.one();
+               for (int i = 0; i < reuse.getArity(); i++) {
+                       reuse.setField(item.getObject(i), i);
+               }
+               return reuse;
+       }
+
+       @Override
+       public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
+               GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
+               return split;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(InputSplit[] 
inputSplits) {
+               return new DefaultInputSplitAssigner(inputSplits);
+       }
+
+       /**
+        * Closes all resources used.
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       if (session != null) {
+                               session.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing session.", e);
+               }
+
+               try {
+                       if (cluster != null ) {
+                               cluster.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing cluster.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
new file mode 100644
index 0000000..15d8fb3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into 
Apache Cassandra.
+ *
+ * @param <OUT> type of Tuple
+ */
+public class CassandraOutputFormat<OUT extends Tuple> extends 
RichOutputFormat<OUT> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormat.class);
+
+       private final String insertQuery;
+       private final ClusterBuilder builder;
+
+       private transient Cluster cluster;
+       private transient Session session;
+       private transient PreparedStatement prepared;
+       private transient FutureCallback<ResultSet> callback;
+       private transient Throwable exception = null;
+
+       public CassandraOutputFormat(String insertQuery, ClusterBuilder 
builder) {
+               
Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot 
be null or empty");
+               Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
+               this.insertQuery = insertQuery;
+               this.builder = builder;
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+               this.cluster = builder.getCluster();
+       }
+
+       /**
+        * Opens a Session to Cassandra and initializes the prepared statement.
+        *
+        * @param taskNumber The number of the parallel instance.
+        * @throws IOException Thrown, if the output could not be opened due to 
an
+        *                     I/O problem.
+        */
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {
+               this.session = cluster.connect();
+               this.prepared = session.prepare(insertQuery);
+               this.callback = new FutureCallback<ResultSet>() {
+                       @Override
+                       public void onSuccess(ResultSet ignored) {
+                       }
+
+                       @Override
+                       public void onFailure(Throwable t) {
+                               exception = t;
+                       }
+               };
+       }
+
+       @Override
+       public void writeRecord(OUT record) throws IOException {
+               if (exception != null) {
+                       throw new IOException("write record failed", exception);
+               }
+
+               Object[] fields = new Object[record.getArity()];
+               for (int i = 0; i < record.getArity(); i++) {
+                       fields[i] = record.getField(i);
+               }
+               ResultSetFuture result = 
session.executeAsync(prepared.bind(fields));
+               Futures.addCallback(result, callback);
+       }
+
+       /**
+        * Closes all resources used.
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       if (session != null) {
+                               session.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing session.", e);
+               }
+
+               try {
+                       if (cluster != null ) {
+                               cluster.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing cluster.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
new file mode 100644
index 0000000..63b76da
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * <p>Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+
+       private static final long serialVersionUID = 1L;
+       
+       private final ClusterBuilder builder;
+       private transient Cluster cluster;
+       private transient Session session;
+
+       private String keySpace = "flink_auxiliary";
+       private String table = "checkpoints_";
+
+       /**
+        * A cache of the last committed checkpoint ids per subtask index. This 
is used to
+        * avoid redundant round-trips to Cassandra (see {@link 
#isCheckpointCommitted(int, long)}.
+        */
+       private final Map<Integer, Long> lastCommittedCheckpoints = new 
HashMap<>();
+
+       public CassandraCommitter(ClusterBuilder builder) {
+               this.builder = builder;
+               ClosureCleaner.clean(builder, true);
+       }
+
+       public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+               this(builder);
+               this.keySpace = keySpace;
+       }
+
+       /**
+        * Internally used to set the job ID after instantiation.
+        */
+       public void setJobId(String id) throws Exception {
+               super.setJobId(id);
+               table += id;
+       }
+
+       /**
+        * Generates the necessary tables to store information.
+        *
+        * @throws Exception
+        */
+       @Override
+       public void createResource() throws Exception {
+               cluster = builder.getCluster();
+               session = cluster.connect();
+
+               session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':1};", 
keySpace));
+               session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", keySpace, table));
+
+               try {
+                       session.close();
+               } catch (Exception e) {
+                       LOG.error("Error while closing session.", e);
+               }
+               try {
+                       cluster.close();
+               } catch (Exception e) {
+                       LOG.error("Error while closing cluster.", e);
+               }
+       }
+
+       @Override
+       public void open() throws Exception {
+               if (builder == null) {
+                       throw new RuntimeException("No ClusterBuilder was 
set.");
+               }
+               cluster = builder.getCluster();
+               session = cluster.connect();
+       }
+
+       @Override
+       public void close() throws Exception {
+               this.lastCommittedCheckpoints.clear();
+               try {
+                       session.close();
+               } catch (Exception e) {
+                       LOG.error("Error while closing session.", e);
+               }
+               try {
+                       cluster.close();
+               } catch (Exception e) {
+                       LOG.error("Error while closing cluster.", e);
+               }
+       }
+
+       @Override
+       public void commitCheckpoint(int subtaskIdx, long checkpointId) {
+               String statement = String.format(
+                       "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' 
and sub_id=%d;",
+                       keySpace, table, checkpointId, operatorId, subtaskIdx);
+
+               session.execute(statement);
+               lastCommittedCheckpoints.put(subtaskIdx, checkpointId);
+       }
+
+       @Override
+       public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) 
{
+               // Pending checkpointed buffers are committed in ascending 
order of their
+               // checkpoint id. This way we can tell if a checkpointed buffer 
was committed
+               // just by asking the third-party storage system for the last 
checkpoint id
+               // committed by the specified subtask.
+
+               Long lastCommittedCheckpoint = 
lastCommittedCheckpoints.get(subtaskIdx);
+               if (lastCommittedCheckpoint == null) {
+                       String statement = String.format(
+                               "SELECT checkpoint_id FROM %s.%s where 
sink_id='%s' and sub_id=%d;",
+                               keySpace, table, operatorId, subtaskIdx);
+
+                       Iterator<Row> resultIt = 
session.execute(statement).iterator();
+                       if (resultIt.hasNext()) {
+                               lastCommittedCheckpoint = 
resultIt.next().getLong("checkpoint_id");
+                               lastCommittedCheckpoints.put(subtaskIdx, 
lastCommittedCheckpoint);
+                       }
+               }
+               return lastCommittedCheckpoint != null && checkpointId <= 
lastCommittedCheckpoint;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
new file mode 100644
index 0000000..650c481
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Flink Sink to save data into a Cassandra cluster using 
+ * <a 
href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html";>Mapper</a>,
+ * which it uses annotations from
+ * <a 
href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html";>
+ * com.datastax.driver.mapping.annotations</a>.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+
+       private static final long serialVersionUID = 1L;
+
+       protected final Class<IN> clazz;
+       protected transient Mapper<IN> mapper;
+       protected transient MappingManager mappingManager;
+
+       /**
+        * The main constructor for creating CassandraPojoSink
+        *
+        * @param clazz Class<IN> instance
+        */
+       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
+               super(builder);
+               this.clazz = clazz;
+       }
+
+       @Override
+       public void open(Configuration configuration) {
+               super.open(configuration);
+               try {
+                       this.mappingManager = new MappingManager(session);
+                       this.mapper = mappingManager.mapper(clazz);
+               } catch (Exception e) {
+                       throw new RuntimeException("Cannot create 
CassandraPojoSink with input: " + clazz.getSimpleName(), e);
+               }
+       }
+
+       @Override
+       public ListenableFuture<Void> send(IN value) {
+               return mapper.saveAsync(value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
new file mode 100644
index 0000000..180b638
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param <IN> input type
+ */
+public class CassandraSink<IN> {
+       private final boolean useDataStreamSink;
+       private DataStreamSink<IN> sink1;
+       private SingleOutputStreamOperator<IN> sink2;
+
+       private CassandraSink(DataStreamSink<IN> sink) {
+               sink1 = sink;
+               useDataStreamSink = true;
+       }
+
+       private CassandraSink(SingleOutputStreamOperator<IN> sink) {
+               sink2 = sink;
+               useDataStreamSink = false;
+       }
+
+       private SinkTransformation<IN> getSinkTransformation() {
+               return sink1.getTransformation();
+       }
+
+       private StreamTransformation<IN> getStreamTransformation() {
+               return sink2.getTransformation();
+       }
+
+       /**
+        * Sets the name of this sink. This name is
+        * used by the visualization and logging during runtime.
+        *
+        * @return The named sink.
+        */
+       public CassandraSink<IN> name(String name) {
+               if (useDataStreamSink) {
+                       getSinkTransformation().setName(name);
+               } else {
+                       getStreamTransformation().setName(name);
+               }
+               return this;
+       }
+
+       /**
+        * Sets an ID for this operator.
+        * <p/>
+        * <p>The specified ID is used to assign the same operator ID across job
+        * submissions (for example when starting a job from a savepoint).
+        * <p/>
+        * <p><strong>Important</strong>: this ID needs to be unique per
+        * transformation and job. Otherwise, job submission will fail.
+        *
+        * @param uid The unique user-specified ID of this transformation.
+        * @return The operator with the specified ID.
+        */
+       public CassandraSink<IN> uid(String uid) {
+               if (useDataStreamSink) {
+                       getSinkTransformation().setUid(uid);
+               } else {
+                       getStreamTransformation().setUid(uid);
+               }
+               return this;
+       }
+
+       /**
+        * Sets the parallelism for this sink. The degree must be higher than 
zero.
+        *
+        * @param parallelism The parallelism for this sink.
+        * @return The sink with set parallelism.
+        */
+       public CassandraSink<IN> setParallelism(int parallelism) {
+               if (useDataStreamSink) {
+                       getSinkTransformation().setParallelism(parallelism);
+               } else {
+                       getStreamTransformation().setParallelism(parallelism);
+               }
+               return this;
+       }
+
+       /**
+        * Turns off chaining for this operator so thread co-location will not 
be
+        * used as an optimization.
+        * <p/>
+        * <p/>
+        * Chaining can be turned off for the whole
+        * job by {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
+        * however it is not advised for performance considerations.
+        *
+        * @return The sink with chaining disabled
+        */
+       public CassandraSink<IN> disableChaining() {
+               if (useDataStreamSink) {
+                       
getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
+               } else {
+                       
getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
+               }
+               return this;
+       }
+
+       /**
+        * Sets the slot sharing group of this operation. Parallel instances of
+        * operations that are in the same slot sharing group will be 
co-located in the same
+        * TaskManager slot, if possible.
+        * <p/>
+        * <p>Operations inherit the slot sharing group of input operations if 
all input operations
+        * are in the same slot sharing group and no slot sharing group was 
explicitly specified.
+        * <p/>
+        * <p>Initially an operation is in the default slot sharing group. An 
operation can be put into
+        * the default group explicitly by setting the slot sharing group to 
{@code "default"}.
+        *
+        * @param slotSharingGroup The slot sharing group name.
+        */
+       public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
+               if (useDataStreamSink) {
+                       
getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
+               } else {
+                       
getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
+               }
+               return this;
+       }
+
+       /**
+        * Writes a DataStream into a Cassandra database.
+        *
+        * @param input input DataStream
+        * @param <IN>  input type
+        * @return CassandraSinkBuilder, to further configure the sink
+        */
+       public static <IN, T extends Tuple> CassandraSinkBuilder<IN> 
addSink(DataStream<IN> input) {
+               if (input.getType() instanceof TupleTypeInfo) {
+                       DataStream<T> tupleInput = (DataStream<T>) input;
+                       return (CassandraSinkBuilder<IN>) new 
CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), 
tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
+               } else {
+                       return new CassandraPojoSinkBuilder<>(input, 
input.getType(), 
input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
+               }
+       }
+
+       public abstract static class CassandraSinkBuilder<IN> {
+               protected final DataStream<IN> input;
+               protected final TypeSerializer<IN> serializer;
+               protected final TypeInformation<IN> typeInfo;
+               protected ClusterBuilder builder;
+               protected String query;
+               protected CheckpointCommitter committer;
+               protected boolean isWriteAheadLogEnabled;
+
+               public CassandraSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+                       this.input = input;
+                       this.typeInfo = typeInfo;
+                       this.serializer = serializer;
+               }
+
+               /**
+                * Sets the query that is to be executed for every record.
+                *
+                * @param query query to use
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> setQuery(String query) {
+                       this.query = query;
+                       return this;
+               }
+
+               /**
+                * Sets the cassandra host to connect to.
+                *
+                * @param host host to connect to
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> setHost(String host) {
+                       return setHost(host, 9042);
+               }
+
+               /**
+                * Sets the cassandra host/port to connect to.
+                *
+                * @param host host to connect to
+                * @param port port to connect to
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> setHost(final String host, 
final int port) {
+                       if (this.builder != null) {
+                               throw new IllegalArgumentException("Builder was 
already set. You must use either setHost() or setClusterBuilder().");
+                       }
+                       this.builder = new ClusterBuilder() {
+                               @Override
+                               protected Cluster buildCluster(Cluster.Builder 
builder) {
+                                       return 
builder.addContactPoint(host).withPort(port).build();
+                               }
+                       };
+                       return this;
+               }
+
+               /**
+                * Sets the ClusterBuilder for this sink. A ClusterBuilder is 
used to configure the connection to cassandra.
+                *
+                * @param builder ClusterBuilder to configure the connection to 
cassandra
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> 
setClusterBuilder(ClusterBuilder builder) {
+                       if (this.builder != null) {
+                               throw new IllegalArgumentException("Builder was 
already set. You must use either setHost() or setClusterBuilder().");
+                       }
+                       this.builder = builder;
+                       return this;
+               }
+
+               /**
+                * Enables the write-ahead log, which allows exactly-once 
processing for non-deterministic algorithms that use
+                * idempotent updates.
+                *
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> enableWriteAheadLog() {
+                       this.isWriteAheadLogEnabled = true;
+                       return this;
+               }
+
+               /**
+                * Enables the write-ahead log, which allows exactly-once 
processing for non-deterministic algorithms that use
+                * idempotent updates.
+                *
+                * @param committer CheckpointCommitter, that stores 
informationa bout completed checkpoints in an external
+                *                  resource. By default this information is 
stored within a separate table within Cassandra.
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> 
enableWriteAheadLog(CheckpointCommitter committer) {
+                       this.isWriteAheadLogEnabled = true;
+                       this.committer = committer;
+                       return this;
+               }
+
+               /**
+                * Finalizes the configuration of this sink.
+                *
+                * @return finalized sink
+                * @throws Exception
+                */
+               public abstract CassandraSink<IN> build() throws Exception;
+
+               protected void sanityCheck() {
+                       if (builder == null) {
+                               throw new IllegalArgumentException("Cassandra 
host information must be supplied using either setHost() or 
setClusterBuilder().");
+                       }
+               }
+       }
+
+       public static class CassandraTupleSinkBuilder<IN extends Tuple> extends 
CassandraSinkBuilder<IN> {
+               public CassandraTupleSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+                       super(input, typeInfo, serializer);
+               }
+
+               @Override
+               protected void sanityCheck() {
+                       super.sanityCheck();
+                       if (query == null || query.length() == 0) {
+                               throw new IllegalArgumentException("Query must 
not be null or empty.");
+                       }
+               }
+
+               @Override
+               public CassandraSink<IN> build() throws Exception {
+                       sanityCheck();
+                       if (isWriteAheadLogEnabled) {
+                               return committer == null
+                                       ? new 
CassandraSink<>(input.transform("Cassandra Sink", null, new 
CassandraTupleWriteAheadSink<>(query, serializer, builder, new 
CassandraCommitter(builder))))
+                                       : new 
CassandraSink<>(input.transform("Cassandra Sink", null, new 
CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
+                       } else {
+                               return new CassandraSink<>(input.addSink(new 
CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
+                       }
+               }
+       }
+
+       public static class CassandraPojoSinkBuilder<IN> extends 
CassandraSinkBuilder<IN> {
+               public CassandraPojoSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
+                       super(input, typeInfo, serializer);
+               }
+
+               @Override
+               protected void sanityCheck() {
+                       super.sanityCheck();
+                       if (query != null) {
+                               throw new IllegalArgumentException("Specifying 
a query is not allowed when using a Pojo-Stream as input.");
+                       }
+               }
+
+               @Override
+               public CassandraSink<IN> build() throws Exception {
+                       sanityCheck();
+                       if (isWriteAheadLogEnabled) {
+                               throw new 
IllegalArgumentException("Exactly-once guarantees can only be provided for 
tuple types.");
+                       } else {
+                               return new CassandraSink<>(input.addSink(new 
CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
new file mode 100644
index 0000000..49b1efa
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} 
and {@link CassandraTupleSink}.
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
+       protected static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBase.class);
+       protected transient Cluster cluster;
+       protected transient Session session;
+
+       protected transient Throwable exception = null;
+       protected transient FutureCallback<V> callback;
+
+       private final ClusterBuilder builder;
+
+       protected CassandraSinkBase(ClusterBuilder builder) {
+               this.builder = builder;
+               ClosureCleaner.clean(builder, true);
+       }
+
+       @Override
+       public void open(Configuration configuration) {
+               this.callback = new FutureCallback<V>() {
+                       @Override
+                       public void onSuccess(V ignored) {
+                       }
+
+                       @Override
+                       public void onFailure(Throwable t) {
+                               exception = t;
+                               LOG.error("Error while sending value.", t);
+                       }
+               };
+               this.cluster = builder.getCluster();
+               this.session = cluster.connect();
+       }
+
+       @Override
+       public void invoke(IN value) throws Exception {
+               if (exception != null) {
+                       throw new IOException("invoke() failed", exception);
+               }
+               ListenableFuture<V> result = send(value);
+               Futures.addCallback(result, callback);
+       }
+
+       public abstract ListenableFuture<V> send(IN value);
+
+       @Override
+       public void close() {
+               try {
+                       if (session != null) {
+                               session.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing session.", e);
+               }
+               try {
+                       if (cluster != null) {
+                               cluster.close();
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error while closing cluster.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
new file mode 100644
index 0000000..0a9ef06
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Flink Sink to save data into a Cassandra cluster.
+ *
+ * @param <IN> Type of the elements emitted by this sink, it must extend 
{@link Tuple}
+ */
+public class CassandraTupleSink<IN extends Tuple> extends 
CassandraSinkBase<IN, ResultSet> {
+       private final String insertQuery;
+       private transient PreparedStatement ps;
+
+       public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
+               super(builder);
+               this.insertQuery = insertQuery;
+       }
+
+       @Override
+       public void open(Configuration configuration) {
+               super.open(configuration);
+               this.ps = session.prepare(insertQuery);
+       }
+
+       @Override
+       public ListenableFuture<ResultSet> send(IN value) {
+               Object[] fields = extract(value);
+               return session.executeAsync(ps.bind(fields));
+       }
+
+       private Object[] extract(IN record) {
+               Object[] al = new Object[record.getArity()];
+               for (int i = 0; i < record.getArity(); i++) {
+                       al[i] = record.getField(i);
+               }
+               return al;
+       }
+}

Reply via email to