http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java new file mode 100644 index 0000000..37a83d1 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.io.avro; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.flink.api.io.avro.generated.Address; +import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.Fixed16; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Test the avro input format. + * (The testcase is mostly the getting started tutorial of avro) + * http://avro.apache.org/docs/current/gettingstartedjava.html + */ +public class AvroSplittableInputFormatTest { + + private File testFile; + + final static String TEST_NAME = "Alyssa"; + + final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; + final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; + + final static boolean TEST_ARRAY_BOOLEAN_1 = true; + final static boolean TEST_ARRAY_BOOLEAN_2 = false; + + final static Colors TEST_ENUM_COLOR = Colors.GREEN; + + final static String TEST_MAP_KEY1 = "KEY 1"; + final static long TEST_MAP_VALUE1 = 8546456L; + final static String TEST_MAP_KEY2 = "KEY 2"; + final static long TEST_MAP_VALUE2 = 17554L; + + final static Integer TEST_NUM = new Integer(239); + final static String TEST_STREET = "Baker Street"; + final static String TEST_CITY = "London"; + final static String TEST_STATE = "London"; + final static String TEST_ZIP = "NW1 6XE"; + + final static int NUM_RECORDS = 5000; + + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroSplittableInputFormatTest", null); + + ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); + stringArray.add(TEST_ARRAY_STRING_1); + stringArray.add(TEST_ARRAY_STRING_2); + + ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); + booleanArray.add(TEST_ARRAY_BOOLEAN_1); + booleanArray.add(TEST_ARRAY_BOOLEAN_2); + + HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); + longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); + longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); + + Address addr = new Address(); + addr.setNum(new Integer(TEST_NUM)); + addr.setStreet(TEST_STREET); + addr.setCity(TEST_CITY); + addr.setState(TEST_STATE); + addr.setZip(TEST_ZIP); + + + User user1 = new User(); + user1.setName(TEST_NAME); + user1.setFavoriteNumber(256); + user1.setTypeDoubleTest(123.45d); + user1.setTypeBoolTest(true); + user1.setTypeArrayString(stringArray); + user1.setTypeArrayBoolean(booleanArray); + user1.setTypeEnum(TEST_ENUM_COLOR); + user1.setTypeMap(longMap); + user1.setTypeNested(addr); + + // Construct via builder + User user2 = User.newBuilder() + .setName(TEST_NAME) + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(new Fixed16()) + .setTypeUnion(123L) + .setTypeNested( + Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) + .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) + .build()) + + .build(); + DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); + DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); + dataFileWriter.create(user1.getSchema(), testFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + + Random rnd = new Random(1337); + for(int i = 0; i < NUM_RECORDS -2 ; i++) { + User user = new User(); + user.setName(TEST_NAME + rnd.nextInt()); + user.setFavoriteNumber(rnd.nextInt()); + user.setTypeDoubleTest(rnd.nextDouble()); + user.setTypeBoolTest(true); + user.setTypeArrayString(stringArray); + user.setTypeArrayBoolean(booleanArray); + user.setTypeEnum(TEST_ENUM_COLOR); + user.setTypeMap(longMap); + Address address = new Address(); + address.setNum(new Integer(TEST_NUM)); + address.setStreet(TEST_STREET); + address.setCity(TEST_CITY); + address.setState(TEST_STATE); + address.setZip(TEST_ZIP); + user.setTypeNested(address); + + dataFileWriter.append(user); + } + dataFileWriter.close(); + } + + @Test + public void testSplittedIF() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(4); + assertEquals(splits.length, 4); + int elements = 0; + int elementsPerSplit[] = new int[4]; + for(int i = 0; i < splits.length; i++) { + format.open(splits[i]); + while(!format.reachedEnd()) { + User u = format.nextRecord(null); + Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); + elements++; + elementsPerSplit[i]++; + } + format.close(); + } + + Assert.assertEquals(1539, elementsPerSplit[0]); + Assert.assertEquals(1026, elementsPerSplit[1]); + Assert.assertEquals(1539, elementsPerSplit[2]); + Assert.assertEquals(896, elementsPerSplit[3]); + Assert.assertEquals(NUM_RECORDS, elements); + format.close(); + } + + @Test + public void testAvroRecoveryWithFailureAtStart() throws Exception { + final int recordsUntilCheckpoint = 132; + + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + format.configure(parameters); + + FileInputSplit[] splits = format.createInputSplits(4); + assertEquals(splits.length, 4); + + int elements = 0; + int elementsPerSplit[] = new int[4]; + for(int i = 0; i < splits.length; i++) { + format.reopen(splits[i], format.getCurrentState()); + while(!format.reachedEnd()) { + User u = format.nextRecord(null); + Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); + elements++; + + if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { + + // do the whole checkpoint-restore procedure and see if we pick up from where we left off. + Tuple2<Long, Long> state = format.getCurrentState(); + + // this is to make sure that nothing stays from the previous format + // (as it is going to be in the normal case) + format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); + + format.reopen(splits[i], state); + assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); + } + elementsPerSplit[i]++; + } + format.close(); + } + + Assert.assertEquals(1539, elementsPerSplit[0]); + Assert.assertEquals(1026, elementsPerSplit[1]); + Assert.assertEquals(1539, elementsPerSplit[2]); + Assert.assertEquals(896, elementsPerSplit[3]); + Assert.assertEquals(NUM_RECORDS, elements); + format.close(); + } + + @Test + public void testAvroRecovery() throws Exception { + final int recordsUntilCheckpoint = 132; + + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + format.configure(parameters); + + FileInputSplit[] splits = format.createInputSplits(4); + assertEquals(splits.length, 4); + + int elements = 0; + int elementsPerSplit[] = new int[4]; + for(int i = 0; i < splits.length; i++) { + format.open(splits[i]); + while(!format.reachedEnd()) { + User u = format.nextRecord(null); + Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); + elements++; + + if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { + + // do the whole checkpoint-restore procedure and see if we pick up from where we left off. + Tuple2<Long, Long> state = format.getCurrentState(); + + // this is to make sure that nothing stays from the previous format + // (as it is going to be in the normal case) + format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); + + format.reopen(splits[i], state); + assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); + } + elementsPerSplit[i]++; + } + format.close(); + } + + Assert.assertEquals(1539, elementsPerSplit[0]); + Assert.assertEquals(1026, elementsPerSplit[1]); + Assert.assertEquals(1539, elementsPerSplit[2]); + Assert.assertEquals(896, elementsPerSplit[3]); + Assert.assertEquals(NUM_RECORDS, elements); + format.close(); + } + + /* + This test is gave the reference values for the test of Flink's IF. + + This dependency needs to be added + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <version>1.7.6</version> + </dependency> + + @Test + public void testHadoop() throws Exception { + JobConf jf = new JobConf(); + FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI())); + jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false); + org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>(); + InputSplit[] sp = format.getSplits(jf, 4); + int elementsPerSplit[] = new int[4]; + int cnt = 0; + int i = 0; + for(InputSplit s:sp) { + RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter()); + AvroWrapper<User> k = r.createKey(); + NullWritable v = r.createValue(); + + while(r.next(k,v)) { + cnt++; + elementsPerSplit[i]++; + } + i++; + } + System.out.println("Status "+Arrays.toString(elementsPerSplit)); + } **/ + + @After + public void deleteFiles() { + testFile.delete(); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java new file mode 100644 index 0000000..5a21691 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.io.avro.example; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +@SuppressWarnings("serial") +public class AvroTypeExample { + + + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<User> users = env.createInput(new UserGeneratingInputFormat()); + + users + .map(new NumberExtractingMapper()) + .groupBy(1) + .reduceGroup(new ConcatenatingReducer()) + .print(); + } + + + public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> { + + @Override + public Tuple2<User, Integer> map(User user) { + return new Tuple2<User, Integer>(user, user.getFavoriteNumber()); + } + } + + + public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> { + + @Override + public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception { + int number = 0; + StringBuilder colors = new StringBuilder(); + + for (Tuple2<User, Integer> u : values) { + number = u.f1; + colors.append(u.f0.getFavoriteColor()).append(" - "); + } + + colors.setLength(colors.length() - 3); + out.collect(new Tuple2<Integer, String>(number, colors.toString())); + } + } + + + public static final class UserGeneratingInputFormat extends GenericInputFormat<User> { + + private static final long serialVersionUID = 1L; + + private static final int NUM = 100; + + private final Random rnd = new Random(32498562304986L); + + private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" }; + + private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" }; + + private int count; + + + @Override + public boolean reachedEnd() throws IOException { + return count >= NUM; + } + + @Override + public User nextRecord(User reuse) throws IOException { + count++; + + User u = new User(); + u.setName(NAMES[rnd.nextInt(NAMES.length)]); + u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]); + u.setFavoriteNumber(rnd.nextInt(87)); + return u; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java new file mode 100644 index 0000000..4608f96 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.flink.api.io.avro.example; +@SuppressWarnings("all") [email protected] +public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public java.lang.CharSequence name; + @Deprecated public java.lang.Integer favorite_number; + @Deprecated public java.lang.CharSequence favorite_color; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use {@link #newBuilder()}. + */ + public User() {} + + /** + * All-args constructor. + */ + public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) { + this.name = name; + this.favorite_number = favorite_number; + this.favorite_color = favorite_color; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return name; + case 1: return favorite_number; + case 2: return favorite_color; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: name = (java.lang.CharSequence)value$; break; + case 1: favorite_number = (java.lang.Integer)value$; break; + case 2: favorite_color = (java.lang.CharSequence)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'name' field. + */ + public java.lang.CharSequence getName() { + return name; + } + + /** + * Sets the value of the 'name' field. + * @param value the value to set. + */ + public void setName(java.lang.CharSequence value) { + this.name = value; + } + + /** + * Gets the value of the 'favorite_number' field. + */ + public java.lang.Integer getFavoriteNumber() { + return favorite_number; + } + + /** + * Sets the value of the 'favorite_number' field. + * @param value the value to set. + */ + public void setFavoriteNumber(java.lang.Integer value) { + this.favorite_number = value; + } + + /** + * Gets the value of the 'favorite_color' field. + */ + public java.lang.CharSequence getFavoriteColor() { + return favorite_color; + } + + /** + * Sets the value of the 'favorite_color' field. + * @param value the value to set. + */ + public void setFavoriteColor(java.lang.CharSequence value) { + this.favorite_color = value; + } + + /** Creates a new User RecordBuilder */ + public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() { + return new org.apache.flink.api.io.avro.example.User.Builder(); + } + + /** Creates a new User RecordBuilder by copying an existing Builder */ + public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) { + return new org.apache.flink.api.io.avro.example.User.Builder(other); + } + + /** Creates a new User RecordBuilder by copying an existing User instance */ + public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) { + return new org.apache.flink.api.io.avro.example.User.Builder(other); + } + + /** + * RecordBuilder for User instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> + implements org.apache.avro.data.RecordBuilder<User> { + + private java.lang.CharSequence name; + private java.lang.Integer favorite_number; + private java.lang.CharSequence favorite_color; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.flink.api.io.avro.example.User.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.flink.api.io.avro.example.User.Builder other) { + super(other); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.favorite_number)) { + this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.favorite_color)) { + this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); + fieldSetFlags()[2] = true; + } + } + + /** Creates a Builder by copying an existing User instance */ + private Builder(org.apache.flink.api.io.avro.example.User other) { + super(org.apache.flink.api.io.avro.example.User.SCHEMA$); + if (isValidValue(fields()[0], other.name)) { + this.name = data().deepCopy(fields()[0].schema(), other.name); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.favorite_number)) { + this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.favorite_color)) { + this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); + fieldSetFlags()[2] = true; + } + } + + /** Gets the value of the 'name' field */ + public java.lang.CharSequence getName() { + return name; + } + + /** Sets the value of the 'name' field */ + public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) { + validate(fields()[0], value); + this.name = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'name' field has been set */ + public boolean hasName() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'name' field */ + public org.apache.flink.api.io.avro.example.User.Builder clearName() { + name = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'favorite_number' field */ + public java.lang.Integer getFavoriteNumber() { + return favorite_number; + } + + /** Sets the value of the 'favorite_number' field */ + public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) { + validate(fields()[1], value); + this.favorite_number = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'favorite_number' field has been set */ + public boolean hasFavoriteNumber() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'favorite_number' field */ + public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() { + favorite_number = null; + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'favorite_color' field */ + public java.lang.CharSequence getFavoriteColor() { + return favorite_color; + } + + /** Sets the value of the 'favorite_color' field */ + public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) { + validate(fields()[2], value); + this.favorite_color = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'favorite_color' field has been set */ + public boolean hasFavoriteColor() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'favorite_color' field */ + public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() { + favorite_color = null; + fieldSetFlags()[2] = false; + return this; + } + + @Override + public User build() { + try { + User record = new User(); + record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); + record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]); + record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java new file mode 100644 index 0000000..e245026 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.io; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +public class AvroInputFormatTypeExtractionTest { + + @Test + public void testTypeExtraction() { + try { + InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class); + + TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<MyAvroType> input = env.createInput(format); + TypeInformation<?> typeInfoDataSet = input.getType(); + + + Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo); + Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo); + + Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass()); + Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + public static final class MyAvroType { + + public String theString; + + public MyAvroType recursive; + + private double aDouble; + + public double getaDouble() { + return aDouble; + } + + public void setaDouble(double aDouble) { + this.aDouble = aDouble; + } + + public void setTheString(String theString) { + this.theString = theString; + } + + public String getTheString() { + return theString; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java new file mode 100644 index 0000000..4d6c6b7 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.avro.Schema; +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + + @Test + public void testSetCodec() throws Exception { + // given + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + + // when + try { + outputFormat.setCodec(Codec.SNAPPY); + } catch (Exception ex) { + // then + fail("unexpected exception"); + } + } + + @Test + public void testSetCodecError() throws Exception { + // given + boolean error = false; + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + + // when + try { + outputFormat.setCodec(null); + } catch (Exception ex) { + error = true; + } + + // then + assertTrue(error); + } + + @Test + public void testSerialization() throws Exception { + + serializeAndDeserialize(null, null); + serializeAndDeserialize(null, User.SCHEMA$); + for (final Codec codec : Codec.values()) { + serializeAndDeserialize(codec, null); + serializeAndDeserialize(codec, User.SCHEMA$); + } + } + + private void serializeAndDeserialize(final Codec codec, final Schema schema) throws IOException, ClassNotFoundException { + // given + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + if (codec != null) { + outputFormat.setCodec(codec); + } + if (schema != null) { + outputFormat.setSchema(schema); + } + + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + // when + try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(outputFormat); + } + try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) { + // then + Object o = ois.readObject(); + assertTrue(o instanceof AvroOutputFormat); + final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o; + final Codec restoredCodec = (Codec) Whitebox.getInternalState(restored, "codec"); + final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema"); + + assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null); + assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null); + } + } + + @Test + public void testCompression() throws Exception { + // given + final Path outputPath = new Path(File.createTempFile("avro-output-file","avro").getAbsolutePath()); + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath,User.class); + outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + + final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath()); + final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath,User.class); + compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + compressedOutputFormat.setCodec(Codec.SNAPPY); + + // when + output(outputFormat); + output(compressedOutputFormat); + + // then + assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath)); + + // cleanup + Files.delete(Paths.get(outputPath.getPath())); + Files.delete(Paths.get(compressedOutputPath.getPath())); + } + + private long fileSize(Path path) throws IOException { + return Files.size(Paths.get(path.getPath())); + } + + private void output(final AvroOutputFormat<User> outputFormat) throws IOException { + outputFormat.configure(new Configuration()); + outputFormat.open(1,1); + for (int i = 0; i < 100; i++) { + outputFormat.writeRecord(new User("testUser",1,"blue")); + } + outputFormat.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc new file mode 100644 index 0000000..02c11af --- /dev/null +++ b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc @@ -0,0 +1,35 @@ +[ +{"namespace": "org.apache.flink.api.io.avro.generated", + "type": "record", + "name": "Address", + "fields": [ + {"name": "num", "type": "int"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"} + ] +}, +{"namespace": "org.apache.flink.api.io.avro.generated", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "type_long_test", "type": ["long", "null"]}, + {"name": "type_double_test", "type": "double"}, + {"name": "type_null_test", "type": ["null"]}, + {"name": "type_bool_test", "type": ["boolean"]}, + {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, + {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, + {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, + {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, + {"name": "type_map", "type": {"type": "map", "values": "long"}}, + {"name": "type_fixed", + "size": 16, + "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, + {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, + {"name": "type_nested", "type": ["null", "Address"]} + ] +}] http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..0b686e5 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/resources/logback-test.xml b/flink-connectors/flink-avro/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/resources/testdata.avro ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/resources/testdata.avro b/flink-connectors/flink-avro/src/test/resources/testdata.avro new file mode 100644 index 0000000..45308b9 Binary files /dev/null and b/flink-connectors/flink-avro/src/test/resources/testdata.avro differ http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml new file mode 100644 index 0000000..4ea790a --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -0,0 +1,179 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-cassandra_2.10</artifactId> + <name>flink-connector-cassandra</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <cassandra.version>2.2.5</cassandra.version> + <driver.version>3.0.0</driver.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <reuseForks>true</reuseForks> + <forkCount>1</forkCount> + <argLine>-Xms256m -Xmx2800m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + <executions> + <!-- Run shade goal on package phase --> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration combine.self="override"> + <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation> + <artifactSet> + <includes> + <include>com.datastax.cassandra:cassandra-driver-core</include> + <include>com.datastax.cassandra:cassandra-driver-mapping</include> + <include>com.google.guava:guava</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.flink.cassandra.shaded.com.google</shadedPattern> + <excludes> + <exclude>com.google.protobuf.**</exclude> + <exclude>com.google.inject.**</exclude> + </excludes> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${driver.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-mapping</artifactId> + <version>${driver.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-all</artifactId> + <version>${cassandra.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java new file mode 100644 index 0000000..849e023 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.common.base.Strings; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}. + * + * @param <OUT> type of Tuple + */ +public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class); + + private final String query; + private final ClusterBuilder builder; + + private transient Cluster cluster; + private transient Session session; + private transient ResultSet resultSet; + + public CassandraInputFormat(String query, ClusterBuilder builder) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + + this.query = query; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + /** + * Opens a Session and executes the query. + * + * @param ignored + * @throws IOException + */ + @Override + public void open(InputSplit ignored) throws IOException { + this.session = cluster.connect(); + this.resultSet = session.execute(query); + } + + @Override + public boolean reachedEnd() throws IOException { + return resultSet.isExhausted(); + } + + @Override + public OUT nextRecord(OUT reuse) throws IOException { + final Row item = resultSet.one(); + for (int i = 0; i < reuse.getArity(); i++) { + reuse.setField(item.getObject(i), i); + } + return reuse; + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; + return split; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + /** + * Closes all resources used. + */ + @Override + public void close() throws IOException { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + + try { + if (cluster != null ) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java new file mode 100644 index 0000000..15d8fb3 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra. + * + * @param <OUT> type of Tuple + */ +public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class); + + private final String insertQuery; + private final ClusterBuilder builder; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement prepared; + private transient FutureCallback<ResultSet> callback; + private transient Throwable exception = null; + + public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + + this.insertQuery = insertQuery; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + /** + * Opens a Session to Cassandra and initializes the prepared statement. + * + * @param taskNumber The number of the parallel instance. + * @throws IOException Thrown, if the output could not be opened due to an + * I/O problem. + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + this.session = cluster.connect(); + this.prepared = session.prepare(insertQuery); + this.callback = new FutureCallback<ResultSet>() { + @Override + public void onSuccess(ResultSet ignored) { + } + + @Override + public void onFailure(Throwable t) { + exception = t; + } + }; + } + + @Override + public void writeRecord(OUT record) throws IOException { + if (exception != null) { + throw new IOException("write record failed", exception); + } + + Object[] fields = new Object[record.getArity()]; + for (int i = 0; i < record.getArity(); i++) { + fields[i] = record.getField(i); + } + ResultSetFuture result = session.executeAsync(prepared.bind(fields)); + Futures.addCallback(result, callback); + } + + /** + * Closes all resources used. + */ + @Override + public void close() throws IOException { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + + try { + if (cluster != null ) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java new file mode 100644 index 0000000..63b76da --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + + private static final long serialVersionUID = 1L; + + private final ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private String keySpace = "flink_auxiliary"; + private String table = "checkpoints_"; + + /** + * A cache of the last committed checkpoint ids per subtask index. This is used to + * avoid redundant round-trips to Cassandra (see {@link #isCheckpointCommitted(int, long)}. + */ + private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<>(); + + public CassandraCommitter(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public CassandraCommitter(ClusterBuilder builder, String keySpace) { + this(builder); + this.keySpace = keySpace; + } + + /** + * Internally used to set the job ID after instantiation. + */ + public void setJobId(String id) throws Exception { + super.setJobId(id); + table += id; + } + + /** + * Generates the necessary tables to store information. + * + * @throws Exception + */ + @Override + public void createResource() throws Exception { + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", keySpace)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table)); + + try { + session.close(); + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + cluster.close(); + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } + + @Override + public void open() throws Exception { + if (builder == null) { + throw new RuntimeException("No ClusterBuilder was set."); + } + cluster = builder.getCluster(); + session = cluster.connect(); + } + + @Override + public void close() throws Exception { + this.lastCommittedCheckpoints.clear(); + try { + session.close(); + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + cluster.close(); + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } + + @Override + public void commitCheckpoint(int subtaskIdx, long checkpointId) { + String statement = String.format( + "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;", + keySpace, table, checkpointId, operatorId, subtaskIdx); + + session.execute(statement); + lastCommittedCheckpoints.put(subtaskIdx, checkpointId); + } + + @Override + public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) { + // Pending checkpointed buffers are committed in ascending order of their + // checkpoint id. This way we can tell if a checkpointed buffer was committed + // just by asking the third-party storage system for the last checkpoint id + // committed by the specified subtask. + + Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx); + if (lastCommittedCheckpoint == null) { + String statement = String.format( + "SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", + keySpace, table, operatorId, subtaskIdx); + + Iterator<Row> resultIt = session.execute(statement).iterator(); + if (resultIt.hasNext()) { + lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id"); + lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint); + } + } + return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java new file mode 100644 index 0000000..650c481 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.configuration.Configuration; + +/** + * Flink Sink to save data into a Cassandra cluster using + * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>, + * which it uses annotations from + * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html"> + * com.datastax.driver.mapping.annotations</a>. + * + * @param <IN> Type of the elements emitted by this sink + */ +public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> { + + private static final long serialVersionUID = 1L; + + protected final Class<IN> clazz; + protected transient Mapper<IN> mapper; + protected transient MappingManager mappingManager; + + /** + * The main constructor for creating CassandraPojoSink + * + * @param clazz Class<IN> instance + */ + public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) { + super(builder); + this.clazz = clazz; + } + + @Override + public void open(Configuration configuration) { + super.open(configuration); + try { + this.mappingManager = new MappingManager(session); + this.mapper = mappingManager.mapper(clazz); + } catch (Exception e) { + throw new RuntimeException("Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e); + } + } + + @Override + public ListenableFuture<Void> send(IN value) { + return mapper.saveAsync(value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java new file mode 100644 index 0000000..180b638 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param <IN> input type + */ +public class CassandraSink<IN> { + private final boolean useDataStreamSink; + private DataStreamSink<IN> sink1; + private SingleOutputStreamOperator<IN> sink2; + + private CassandraSink(DataStreamSink<IN> sink) { + sink1 = sink; + useDataStreamSink = true; + } + + private CassandraSink(SingleOutputStreamOperator<IN> sink) { + sink2 = sink; + useDataStreamSink = false; + } + + private SinkTransformation<IN> getSinkTransformation() { + return sink1.getTransformation(); + } + + private StreamTransformation<IN> getStreamTransformation() { + return sink2.getTransformation(); + } + + /** + * Sets the name of this sink. This name is + * used by the visualization and logging during runtime. + * + * @return The named sink. + */ + public CassandraSink<IN> name(String name) { + if (useDataStreamSink) { + getSinkTransformation().setName(name); + } else { + getStreamTransformation().setName(name); + } + return this; + } + + /** + * Sets an ID for this operator. + * <p/> + * <p>The specified ID is used to assign the same operator ID across job + * submissions (for example when starting a job from a savepoint). + * <p/> + * <p><strong>Important</strong>: this ID needs to be unique per + * transformation and job. Otherwise, job submission will fail. + * + * @param uid The unique user-specified ID of this transformation. + * @return The operator with the specified ID. + */ + public CassandraSink<IN> uid(String uid) { + if (useDataStreamSink) { + getSinkTransformation().setUid(uid); + } else { + getStreamTransformation().setUid(uid); + } + return this; + } + + /** + * Sets the parallelism for this sink. The degree must be higher than zero. + * + * @param parallelism The parallelism for this sink. + * @return The sink with set parallelism. + */ + public CassandraSink<IN> setParallelism(int parallelism) { + if (useDataStreamSink) { + getSinkTransformation().setParallelism(parallelism); + } else { + getStreamTransformation().setParallelism(parallelism); + } + return this; + } + + /** + * Turns off chaining for this operator so thread co-location will not be + * used as an optimization. + * <p/> + * <p/> + * Chaining can be turned off for the whole + * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()} + * however it is not advised for performance considerations. + * + * @return The sink with chaining disabled + */ + public CassandraSink<IN> disableChaining() { + if (useDataStreamSink) { + getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER); + } else { + getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER); + } + return this; + } + + /** + * Sets the slot sharing group of this operation. Parallel instances of + * operations that are in the same slot sharing group will be co-located in the same + * TaskManager slot, if possible. + * <p/> + * <p>Operations inherit the slot sharing group of input operations if all input operations + * are in the same slot sharing group and no slot sharing group was explicitly specified. + * <p/> + * <p>Initially an operation is in the default slot sharing group. An operation can be put into + * the default group explicitly by setting the slot sharing group to {@code "default"}. + * + * @param slotSharingGroup The slot sharing group name. + */ + public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) { + if (useDataStreamSink) { + getSinkTransformation().setSlotSharingGroup(slotSharingGroup); + } else { + getStreamTransformation().setSlotSharingGroup(slotSharingGroup); + } + return this; + } + + /** + * Writes a DataStream into a Cassandra database. + * + * @param input input DataStream + * @param <IN> input type + * @return CassandraSinkBuilder, to further configure the sink + */ + public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) { + if (input.getType() instanceof TupleTypeInfo) { + DataStream<T> tupleInput = (DataStream<T>) input; + return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); + } else { + return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig())); + } + } + + public abstract static class CassandraSinkBuilder<IN> { + protected final DataStream<IN> input; + protected final TypeSerializer<IN> serializer; + protected final TypeInformation<IN> typeInfo; + protected ClusterBuilder builder; + protected String query; + protected CheckpointCommitter committer; + protected boolean isWriteAheadLogEnabled; + + public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) { + this.input = input; + this.typeInfo = typeInfo; + this.serializer = serializer; + } + + /** + * Sets the query that is to be executed for every record. + * + * @param query query to use + * @return this builder + */ + public CassandraSinkBuilder<IN> setQuery(String query) { + this.query = query; + return this; + } + + /** + * Sets the cassandra host to connect to. + * + * @param host host to connect to + * @return this builder + */ + public CassandraSinkBuilder<IN> setHost(String host) { + return setHost(host, 9042); + } + + /** + * Sets the cassandra host/port to connect to. + * + * @param host host to connect to + * @param port port to connect to + * @return this builder + */ + public CassandraSinkBuilder<IN> setHost(final String host, final int port) { + if (this.builder != null) { + throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder()."); + } + this.builder = new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoint(host).withPort(port).build(); + } + }; + return this; + } + + /** + * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra. + * + * @param builder ClusterBuilder to configure the connection to cassandra + * @return this builder + */ + public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) { + if (this.builder != null) { + throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder()."); + } + this.builder = builder; + return this; + } + + /** + * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use + * idempotent updates. + * + * @return this builder + */ + public CassandraSinkBuilder<IN> enableWriteAheadLog() { + this.isWriteAheadLogEnabled = true; + return this; + } + + /** + * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use + * idempotent updates. + * + * @param committer CheckpointCommitter, that stores informationa bout completed checkpoints in an external + * resource. By default this information is stored within a separate table within Cassandra. + * @return this builder + */ + public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) { + this.isWriteAheadLogEnabled = true; + this.committer = committer; + return this; + } + + /** + * Finalizes the configuration of this sink. + * + * @return finalized sink + * @throws Exception + */ + public abstract CassandraSink<IN> build() throws Exception; + + protected void sanityCheck() { + if (builder == null) { + throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder()."); + } + } + } + + public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> { + public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query == null || query.length() == 0) { + throw new IllegalArgumentException("Query must not be null or empty."); + } + } + + @Override + public CassandraSink<IN> build() throws Exception { + sanityCheck(); + if (isWriteAheadLogEnabled) { + return committer == null + ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder)))) + : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer))); + } else { + return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink")); + } + } + } + + public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> { + public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query != null) { + throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input."); + } + } + + @Override + public CassandraSink<IN> build() throws Exception { + sanityCheck(); + if (isWriteAheadLogEnabled) { + throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types."); + } else { + return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink")); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java new file mode 100644 index 0000000..49b1efa --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. + * + * @param <IN> Type of the elements emitted by this sink + */ +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { + protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class); + protected transient Cluster cluster; + protected transient Session session; + + protected transient Throwable exception = null; + protected transient FutureCallback<V> callback; + + private final ClusterBuilder builder; + + protected CassandraSinkBase(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + @Override + public void open(Configuration configuration) { + this.callback = new FutureCallback<V>() { + @Override + public void onSuccess(V ignored) { + } + + @Override + public void onFailure(Throwable t) { + exception = t; + LOG.error("Error while sending value.", t); + } + }; + this.cluster = builder.getCluster(); + this.session = cluster.connect(); + } + + @Override + public void invoke(IN value) throws Exception { + if (exception != null) { + throw new IOException("invoke() failed", exception); + } + ListenableFuture<V> result = send(value); + Futures.addCallback(result, callback); + } + + public abstract ListenableFuture<V> send(IN value); + + @Override + public void close() { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java new file mode 100644 index 0000000..0a9ef06 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; + +/** + * Flink Sink to save data into a Cassandra cluster. + * + * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple} + */ +public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> { + private final String insertQuery; + private transient PreparedStatement ps; + + public CassandraTupleSink(String insertQuery, ClusterBuilder builder) { + super(builder); + this.insertQuery = insertQuery; + } + + @Override + public void open(Configuration configuration) { + super.open(configuration); + this.ps = session.prepare(insertQuery); + } + + @Override + public ListenableFuture<ResultSet> send(IN value) { + Object[] fields = extract(value); + return session.executeAsync(ps.bind(fields)); + } + + private Object[] extract(IN record) { + Object[] al = new Object[record.getArity()]; + for (int i = 0; i < record.getArity(); i++) { + al[i] = record.getField(i); + } + return al; + } +}
