http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java new file mode 100644 index 0000000..a3d002e --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra + * if a checkpoint is completed. + * + * @param <IN> Type of the elements emitted by this sink + */ +public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> { + private static final long serialVersionUID = 1L; + + protected transient Cluster cluster; + protected transient Session session; + + private final String insertQuery; + private transient PreparedStatement preparedStatement; + + private ClusterBuilder builder; + + private transient Object[] fields; + + protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { + super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); + this.insertQuery = insertQuery; + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public void open() throws Exception { + super.open(); + if (!getRuntimeContext().isCheckpointingEnabled()) { + throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); + } + cluster = builder.getCluster(); + session = cluster.connect(); + preparedStatement = session.prepare(insertQuery); + + fields = new Object[((TupleSerializer<IN>) serializer).getArity()]; + } + + @Override + public void close() throws Exception { + super.close(); + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } + + @Override + protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception { + final AtomicInteger updatesCount = new AtomicInteger(0); + final AtomicInteger updatesConfirmed = new AtomicInteger(0); + + final AtomicReference<Throwable> exception = new AtomicReference<>(); + + FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + if (updatesCount.get() > 0) { // only set if all updates have been sent + if (updatesCount.get() == updatesConfirmed.get()) { + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + } + + @Override + public void onFailure(Throwable throwable) { + if (exception.compareAndSet(null, throwable)) { + LOG.error("Error while sending value.", throwable); + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + }; + + //set values for prepared statement + int updatesSent = 0; + for (IN value : values) { + for (int x = 0; x < value.getArity(); x++) { + fields[x] = value.getField(x); + } + //insert values and send to cassandra + BoundStatement s = preparedStatement.bind(fields); + s.setDefaultTimestamp(timestamp); + ResultSetFuture result = session.executeAsync(s); + updatesSent++; + if (result != null) { + //add callback to detect errors + Futures.addCallback(result, callback); + } + } + updatesCount.set(updatesSent); + + synchronized (updatesConfirmed) { + while (exception.get() == null && updatesSent != updatesConfirmed.get()) { + updatesConfirmed.wait(); + } + } + + if (exception.get() != null) { + LOG.warn("Sending a value failed.", exception.get()); + return false; + } else { + return true; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java new file mode 100644 index 0000000..9fd3b4e --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; + +import java.io.Serializable; + +/** + * This class is used to configure a {@link com.datastax.driver.core.Cluster} after deployment. + * The cluster represents the connection that will be established to Cassandra. + */ +public abstract class ClusterBuilder implements Serializable { + + public Cluster getCluster() { + return buildCluster(Cluster.builder()); + } + + /** + * Configures the connection to Cassandra. + * The configuration is done by calling methods on the builder object + * and finalizing the configuration with build(). + * + * @param builder connection builder + * @return configured connection + */ + protected abstract Cluster buildCluster(Cluster.Builder builder); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java new file mode 100644 index 0000000..e66b8b3 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Cluster.Builder; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following query: + * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* + * table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" + */ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList<Tuple2<Integer, String>> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet<Tuple2<Integer, String>> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraOutputFormat<Tuple2<Integer, String>>(INSERT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + })); + + env.execute("Write"); + + DataSet<Tuple2<Integer, String>> inputDS = env + .createInput(new CassandraInputFormat<Tuple2<Integer, String>>(SELECT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + }), TupleTypeInfo.of(new TypeHint<Tuple2<Integer, String>>() { + })); + + inputDS.print(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java new file mode 100644 index 0000000..2bb6fd1 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; + +import org.apache.cassandra.service.CassandraDaemon; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.TestEnvironment; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Scanner; +import java.util.UUID; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); + private static File tmpDir; + + private static final boolean EMBEDDED = true; + + private static EmbeddedCassandraService cassandra; + + private static ClusterBuilder builder = new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder + .addContactPoint("127.0.0.1") + .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) + .withoutJMXReporting() + .withoutMetrics().build(); + } + }; + + private static Cluster cluster; + private static Session session; + + private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; + private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; + private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; + private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; + private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; + + private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); + + static { + for (int i = 0; i < 20; i++) { + collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); + } + } + + private static class EmbeddedCassandraService { + CassandraDaemon cassandraDaemon; + + public void start() throws IOException { + this.cassandraDaemon = new CassandraDaemon(); + this.cassandraDaemon.init(null); + this.cassandraDaemon.start(); + } + + public void stop() { + this.cassandraDaemon.stop(); + } + } + + private static LocalFlinkMiniCluster flinkCluster; + + // ------------------------------------------------------------------------ + // Cluster Setup (Cassandra & Flink) + // ------------------------------------------------------------------------ + + @BeforeClass + public static void startCassandra() throws IOException { + + // check if we should run this test, current Cassandra version requires Java >= 1.8 + org.apache.flink.core.testutils.CommonTestUtils.assumeJava8(); + + // generate temporary files + tmpDir = CommonTestUtils.createTempDirectory(); + ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); + File file = new File(classLoader.getResource("cassandra.yaml").getFile()); + File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); + + assertTrue(tmp.createNewFile()); + + try ( + BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); + + //copy cassandra.yaml; inject absolute paths into cassandra.yaml + Scanner scanner = new Scanner(file); + ) { + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + line = line.replace("$PATH", "'" + tmp.getParentFile()); + b.write(line + "\n"); + b.flush(); + } + } + + + // Tell cassandra where the configuration files are. + // Use the test configuration file. + System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); + + if (EMBEDDED) { + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + } + + try { + Thread.sleep(1000 * 10); + } catch (InterruptedException e) { //give cassandra a few seconds to start up + } + + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(CREATE_KEYSPACE_QUERY); + session.execute(CREATE_TABLE_QUERY); + } + + @BeforeClass + public static void startFlink() throws Exception { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + + flinkCluster = new LocalFlinkMiniCluster(config); + flinkCluster.start(); + } + + @AfterClass + public static void stopFlink() { + if (flinkCluster != null) { + flinkCluster.stop(); + flinkCluster = null; + } + } + + @AfterClass + public static void closeCassandra() { + if (session != null) { + session.executeAsync(DROP_KEYSPACE_QUERY); + session.close(); + } + + if (cluster != null) { + cluster.close(); + } + + if (cassandra != null) { + cassandra.stop(); + } + + if (tmpDir != null) { + //noinspection ResultOfMethodCallIgnored + tmpDir.delete(); + } + } + + // ------------------------------------------------------------------------ + // Test preparation & cleanup + // ------------------------------------------------------------------------ + + @Before + public void initializeExecutionEnvironment() { + TestStreamEnvironment.setAsContext(flinkCluster, 4); + new TestEnvironment(flinkCluster, 4, false).setAsContext(); + } + + @After + public void deleteSchema() throws Exception { + session.executeAsync(CLEAR_TABLE_QUERY); + } + + // ------------------------------------------------------------------------ + // Exactly-once Tests + // ------------------------------------------------------------------------ + + @Override + protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { + return new CassandraTupleWriteAheadSink<>( + INSERT_DATA_QUERY, + TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), + builder, + new CassandraCommitter(builder)); + } + + @Override + protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() { + return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class); + } + + @Override + protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) { + return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID); + } + + @Override + protected void verifyResultsIdealCircumstances( + OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + + ResultSet result = session.execute(SELECT_DATA_QUERY); + ArrayList<Integer> list = new ArrayList<>(); + for (int x = 1; x <= 60; x++) { + list.add(x); + } + + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + } + + @Override + protected void verifyResultsDataPersistenceUponMissedNotify( + OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + + ResultSet result = session.execute(SELECT_DATA_QUERY); + ArrayList<Integer> list = new ArrayList<>(); + for (int x = 1; x <= 60; x++) { + list.add(x); + } + + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + } + + @Override + protected void verifyResultsDataDiscardingUponRestore( + OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, + CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { + + ResultSet result = session.execute(SELECT_DATA_QUERY); + ArrayList<Integer> list = new ArrayList<>(); + for (int x = 1; x <= 20; x++) { + list.add(x); + } + for (int x = 41; x <= 60; x++) { + list.add(x); + } + + for (Row s : result) { + list.remove(new Integer(s.getInt("counter"))); + } + Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); + } + + @Test + public void testCassandraCommitter() throws Exception { + CassandraCommitter cc1 = new CassandraCommitter(builder); + cc1.setJobId("job"); + cc1.setOperatorId("operator"); + + CassandraCommitter cc2 = new CassandraCommitter(builder); + cc2.setJobId("job"); + cc2.setOperatorId("operator"); + + CassandraCommitter cc3 = new CassandraCommitter(builder); + cc3.setJobId("job"); + cc3.setOperatorId("operator1"); + + cc1.createResource(); + + cc1.open(); + cc2.open(); + cc3.open(); + + Assert.assertFalse(cc1.isCheckpointCommitted(0, 1)); + Assert.assertFalse(cc2.isCheckpointCommitted(1, 1)); + Assert.assertFalse(cc3.isCheckpointCommitted(0, 1)); + + cc1.commitCheckpoint(0, 1); + Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); + //verify that other sub-tasks aren't affected + Assert.assertFalse(cc2.isCheckpointCommitted(1, 1)); + //verify that other tasks aren't affected + Assert.assertFalse(cc3.isCheckpointCommitted(0, 1)); + + Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); + + cc1.close(); + cc2.close(); + cc3.close(); + + cc1 = new CassandraCommitter(builder); + cc1.setJobId("job"); + cc1.setOperatorId("operator"); + + cc1.open(); + + //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data + Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); + Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); + + cc1.close(); + } + + // ------------------------------------------------------------------------ + // At-least-once Tests + // ------------------------------------------------------------------------ + + @Test + public void testCassandraTupleAtLeastOnceSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); + source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + + env.execute(); + + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraPojoAtLeastOnceSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource<Pojo> source = env + .addSource(new SourceFunction<Pojo>() { + + private boolean running = true; + private volatile int cnt = 0; + + @Override + public void run(SourceContext<Pojo> ctx) throws Exception { + while (running) { + ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); + cnt++; + if (cnt == 20) { + cancel(); + } + } + } + + @Override + public void cancel() { + running = false; + } + }); + + source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); + + env.execute(); + + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraBatchFormats() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection); + dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); + + env.execute("Write data"); + + DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( + new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), + TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); + + + long count = inputDS.count(); + Assert.assertEquals(count, 20L); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java new file mode 100644 index 0000000..847d1a0 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collections; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +public class CassandraTupleWriteAheadSinkTest { + + @Test(timeout=20000) + public void testAckLoopExitOnException() throws Exception { + final AtomicReference<Runnable> runnableFuture = new AtomicReference<>(); + + final ClusterBuilder clusterBuilder = new ClusterBuilder() { + private static final long serialVersionUID = 4624400760492936756L; + + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + try { + BoundStatement boundStatement = mock(BoundStatement.class); + when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement); + + PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement); + + ResultSetFuture future = mock(ResultSetFuture.class); + when(future.get()).thenThrow(new RuntimeException("Expected exception.")); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + synchronized (runnableFuture) { + runnableFuture.set((((Runnable) invocationOnMock.getArguments()[0]))); + runnableFuture.notifyAll(); + } + return null; + } + }).when(future).addListener(any(Runnable.class), any(Executor.class)); + + Session session = mock(Session.class); + when(session.prepare(anyString())).thenReturn(preparedStatement); + when(session.executeAsync(any(BoundStatement.class))).thenReturn(future); + + Cluster cluster = mock(Cluster.class); + when(cluster.connect()).thenReturn(session); + return cluster; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + // Our asynchronous executor thread + new Thread(new Runnable() { + @Override + public void run() { + synchronized (runnableFuture) { + while (runnableFuture.get() == null) { + try { + runnableFuture.wait(); + } catch (InterruptedException e) { + // ignore interrupts + } + } + } + runnableFuture.get().run(); + } + }).start(); + + CheckpointCommitter cc = mock(CheckpointCommitter.class); + final CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>( + "abc", + TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()), + clusterBuilder, + cc + ); + + OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink); + harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true); + + harness.setup(); + sink.open(); + + // we should leave the loop and return false since we've seen an exception + assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 0L)); + + sink.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java new file mode 100644 index 0000000..9b331d6 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +import java.io.Serializable; + +@Table(keyspace = "flink", name = "test") +public class Pojo implements Serializable { + + private static final long serialVersionUID = 1038054554690916991L; + + @Column(name = "id") + private String id; + @Column(name = "counter") + private int counter; + @Column(name = "batch_id") + private int batch_id; + + public Pojo(String id, int counter, int batch_id) { + this.id = id; + this.counter = counter; + this.batch_id = batch_id; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public int getCounter() { + return counter; + } + + public void setCounter(int counter) { + this.counter = counter; + } + + public int getBatch_id() { + return batch_id; + } + + public void setBatch_id(int batch_id) { + this.batch_id = batch_id; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java new file mode 100644 index 0000000..e1bcea9 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Cluster.Builder; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.cassandra.CassandraSink; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the Pojo Cassandra Sink in the Streaming API. + * + * Pojo's have to be annotated with datastax annotations to work with this sink. + * + * The example assumes that a table exists in a local cassandra database, according to the following query: + * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY) + */ +public class CassandraPojoSinkExample { + private static final ArrayList<Message> messages = new ArrayList<>(20); + + static { + for (long i = 0; i < 20; i++) { + messages.add(new Message("cassandra-" + i)); + } + } + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Message> source = env.fromCollection(messages); + + CassandraSink.addSink(source) + .setClusterBuilder(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Builder builder) { + return builder.addContactPoint("127.0.0.1").build(); + } + }) + .build(); + + env.execute("Cassandra Sink example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java new file mode 100644 index 0000000..c6345df --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Cluster.Builder; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.cassandra.CassandraSink; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API. + * + * The example assumes that a table exists in a local cassandra database, according to the following query: + * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int) + */ +public class CassandraTupleSinkExample { + private static final String INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"; + private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20); + + static { + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>("cassandra-" + i, i)); + } + } + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection); + + CassandraSink.addSink(source) + .setQuery(INSERT) + .setClusterBuilder(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Builder builder) { + return builder.addContactPoint("127.0.0.1").build(); + } + }) + .build(); + + env.execute("WriteTupleIntoCassandra"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java new file mode 100644 index 0000000..811c410 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.cassandra.CassandraSink; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.UUID; + +/** + * This is an example showing the to use the Cassandra Sink (with write-ahead log) in the Streaming API. + * + * The example assumes that a table exists in a local cassandra database, according to the following query: + * CREATE TABLE example.values (id text, count int, PRIMARY KEY(id)); + * + * Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call + * when creating the CassandraSink. + */ +public class CassandraTupleWriteAheadSinkExample { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000)); + env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend")); + + CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource())) + .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") + .enableWriteAheadLog() + .setClusterBuilder(new ClusterBuilder() { + @Override + public Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoint("127.0.0.1").build(); + } + }) + .build(); + + sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello"); + + env.execute(); + } + + public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> { + private int counter = 0; + private boolean stop = false; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + while (!stop) { + Thread.sleep(50); + ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1)); + counter++; + if (counter == 100) { + stop = true; + } + } + } + + @Override + public void cancel() { + stop = true; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return counter; + } + + @Override + public void restoreState(Integer state) throws Exception { + this.counter = state; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java new file mode 100644 index 0000000..7524d95 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra.example; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +import java.io.Serializable; + +@Table(keyspace = "test", name = "message") +public class Message implements Serializable { + + private static final long serialVersionUID = 1123119384361005680L; + + @Column(name = "body") + private String message; + + public Message(String word) { + this.message = word; + } + + public String getMessage() { + return message; + } + + public void setMessage(String word) { + this.message = word; + } + + public boolean equals(Object other) { + if (other instanceof Message) { + Message that = (Message) other; + return this.message.equals(that.message); + } + return false; + } + + @Override + public int hashCode() { + return message.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml new file mode 100644 index 0000000..0594ea3 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml @@ -0,0 +1,43 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +cluster_name: 'Test Cluster' +commitlog_sync: 'periodic' +commitlog_sync_period_in_ms: 10000 +commitlog_segment_size_in_mb: 16 +partitioner: 'org.apache.cassandra.dht.RandomPartitioner' +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' +commitlog_directory: $PATH/commit' +data_file_directories: + - $PATH/data' +saved_caches_directory: $PATH/cache' +listen_address: '127.0.0.1' +seed_provider: + - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider' + parameters: + - seeds: '127.0.0.1' +native_transport_port: 9042 + +concurrent_reads: 8 +concurrent_writes: 8 + +auto_bootstrap: false +auto_snapshot: false + +start_rpc: false +start_native_transport: true +native_transport_max_threads: 8 http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..a43d556 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target= System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/pom.xml b/flink-connectors/flink-connector-elasticsearch/pom.xml new file mode 100644 index 0000000..0b78484 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-elasticsearch_2.10</artifactId> + <name>flink-connector-elasticsearch</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <elasticsearch.version>1.7.1</elasticsearch.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <rerunFailingTestsCount>3</rerunFailingTestsCount> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java new file mode 100644 index 0000000..ac14ade --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + + +/** + * Sink that emits its input elements to an Elasticsearch cluster. + * + * <p> + * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} + * the sink will create a local {@link Node} for communicating with the + * Elasticsearch cluster. When using the second constructor + * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will + * be used instead. + * + * <p> + * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster + * to come online. + * + * <p> + * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * + * <p> + * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * <ul> + * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * </ul> + * + * <p> + * You also have to provide an {@link IndexRequestBuilder}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example. + * + * @param <T> Type of the elements emitted by this sink + */ +public class ElasticsearchSink<T> extends RichSinkFunction<T> { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); + + /** + * The user specified config map that we forward to Elasticsearch when we create the Client. + */ + private final Map<String, String> userConfig; + + /** + * The list of nodes that the TransportClient should connect to. This is null if we are using + * an embedded Node to get a Client. + */ + private final List<TransportAddress> transportNodes; + + /** + * The builder that is used to construct an {@link IndexRequest} from the incoming element. + */ + private final IndexRequestBuilder<T> indexRequestBuilder; + + /** + * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null + * if we are using a TransportClient. + */ + private transient Node node; + + /** + * The Client that was either retrieved from a Node or is a TransportClient. + */ + private transient Client client; + + /** + * Bulk processor that was created using the client + */ + private transient BulkProcessor bulkProcessor; + + /** + * This is set from inside the BulkProcessor listener if there where failures in processing. + */ + private final AtomicBoolean hasFailure = new AtomicBoolean(false); + + /** + * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. + */ + private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); + + /** + * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node. + * + * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor + * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element + */ + public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) { + this.userConfig = userConfig; + this.indexRequestBuilder = indexRequestBuilder; + transportNodes = null; + } + + /** + * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. + * + * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor + * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient} + * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element + * + */ + public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) { + this.userConfig = userConfig; + this.indexRequestBuilder = indexRequestBuilder; + this.transportNodes = transportNodes; + } + + /** + * Initializes the connection to Elasticsearch by either creating an embedded + * {@link org.elasticsearch.node.Node} and retrieving the + * {@link org.elasticsearch.client.Client} from it or by creating a + * {@link org.elasticsearch.client.transport.TransportClient}. + */ + @Override + public void open(Configuration configuration) { + if (transportNodes == null) { + // Make sure that we disable http access to our embedded node + Settings settings = + ImmutableSettings.settingsBuilder() + .put(userConfig) + .put("http.enabled", false) + .build(); + + node = + nodeBuilder() + .settings(settings) + .client(true) + .data(false) + .node(); + + client = node.client(); + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch Client {} from embedded Node", client); + } + + } else { + Settings settings = ImmutableSettings.settingsBuilder() + .put(userConfig) + .build(); + + TransportClient transportClient = new TransportClient(settings); + for (TransportAddress transport: transportNodes) { + transportClient.addTransportAddress(transport); + } + + // verify that we actually are connected to a cluster + ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes(); + if (nodes.isEmpty()) { + throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Connected to nodes: " + nodes.toString()); + } + } + + client = transportClient; + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient {}", client); + } + } + + BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( + client, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, + BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + BulkResponse response) { + if (response.hasFailures()) { + for (BulkItemResponse itemResp : response.getItems()) { + if (itemResp.isFailed()) { + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } + } + hasFailure.set(true); + } + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + Throwable failure) { + LOG.error(failure.getMessage()); + failureThrowable.compareAndSet(null, failure); + hasFailure.set(true); + } + }); + + // This makes flush() blocking + bulkProcessorBuilder.setConcurrentRequests(0); + + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { + bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { + bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( + CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { + bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); + } + + bulkProcessor = bulkProcessorBuilder.build(); + } + + @Override + public void invoke(T element) { + IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Emitting IndexRequest: {}", indexRequest); + } + + bulkProcessor.add(indexRequest); + } + + @Override + public void close() { + if (bulkProcessor != null) { + bulkProcessor.close(); + bulkProcessor = null; + } + + if (client != null) { + client.close(); + } + + if (node != null) { + node.close(); + } + + if (hasFailure.get()) { + Throwable cause = failureThrowable.get(); + if (cause != null) { + throw new RuntimeException("An error occured in ElasticsearchSink.", cause); + } else { + throw new RuntimeException("An error occured in ElasticsearchSink."); + + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java new file mode 100644 index 0000000..04ae40a --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.elasticsearch.action.index.IndexRequest; + +import java.io.Serializable; + +/** + * Function that creates an {@link IndexRequest} from an element in a Stream. + * + * <p> + * This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink} + * to prepare elements for sending them to Elasticsearch. See + * <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html">Index API</a> + * for information about how to format data for adding it to an Elasticsearch index. + * + * <p> + * Example: + * + * <pre>{@code + * private static class MyIndexRequestBuilder implements IndexRequestBuilder<String> { + * + * public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { + * Map<String, Object> json = new HashMap<>(); + * json.put("data", element); + * + * return Requests.indexRequest() + * .index("my-index") + * .type("my-type") + * .source(json); + * } + * } + * }</pre> + * + * @param <T> The type of the element handled by this {@code IndexRequestBuilder} + */ +public interface IndexRequestBuilder<T> extends Function, Serializable { + + /** + * Creates an {@link org.elasticsearch.action.index.IndexRequest} from an element. + * + * @param element The element that needs to be turned in to an {@code IndexRequest} + * @param ctx The Flink {@link RuntimeContext} of the {@link ElasticsearchSink} + * + * @return The constructed {@code IndexRequest} + */ + IndexRequest createIndexRequest(T element, RuntimeContext ctx); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java new file mode 100644 index 0000000..33a2e47 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.node.Node; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { + + private static final int NUM_ELEMENTS = 20; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testNodeClient() throws Exception{ + + File dataDir = tempFolder.newFolder(); + + Node node = nodeBuilder() + .settings(ImmutableSettings.settingsBuilder() + .put("http.enabled", false) + .put("path.data", dataDir.getAbsolutePath())) + // set a custom cluster name to verify that user config works correctly + .clusterName("my-node-client-cluster") + .local(true) + .node(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> config = Maps.newHashMap(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-node-client-cluster"); + + // connect to our local node + config.put("node.local", "true"); + + source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder())); + + env.execute("Elasticsearch Node Client Test"); + + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.get(new GetRequest("my-index", + "my-type", + Integer.toString(i))).actionGet(); + Assert.assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test + public void testTransportClient() throws Exception { + + File dataDir = tempFolder.newFolder(); + + Node node = nodeBuilder() + .settings(ImmutableSettings.settingsBuilder() + .put("http.enabled", false) + .put("path.data", dataDir.getAbsolutePath())) + // set a custom cluster name to verify that user config works correctly + .clusterName("my-node-client-cluster") + .local(true) + .node(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> config = Maps.newHashMap(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-node-client-cluster"); + + // connect to our local node + config.put("node.local", "true"); + + List<TransportAddress> transports = Lists.newArrayList(); + transports.add(new LocalTransportAddress("1")); + + source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); + + env.execute("Elasticsearch TransportClient Test"); + + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.get(new GetRequest("my-index", + "my-type", + Integer.toString(i))).actionGet(); + Assert.assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test(expected = JobExecutionException.class) + public void testTransportClientFails() throws Exception{ + // this checks whether the TransportClient fails early when there is no cluster to + // connect to. We don't hava such as test for the Node Client version since that + // one will block and wait for a cluster to come online + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> config = Maps.newHashMap(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-node-client-cluster"); + + // connect to our local node + config.put("node.local", "true"); + + List<TransportAddress> transports = Lists.newArrayList(); + transports.add(new LocalTransportAddress("1")); + + source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); + + env.execute("Elasticsearch Node Client Test"); + } + + private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(Tuple2.of(i, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + @Override + public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element.f1); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element.f0.toString()) + .source(json); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java new file mode 100644 index 0000000..136ae77 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.examples; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.HashMap; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the cluster name in the config map. + */ +public class ElasticsearchExample { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<String> ctx) throws Exception { + for (int i = 0; i < 20 && running; i++) { + ctx.collect("message #" + i); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + Map<String, String> config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { + @Override + public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .source(json); + } + })); + + + env.execute("Elasticsearch Example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..dc20726 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file
