http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java deleted file mode 100644 index a3d002e..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Session; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; -import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; -import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; - -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a - * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra - * if a checkpoint is completed. - * - * @param <IN> Type of the elements emitted by this sink - */ -public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> { - private static final long serialVersionUID = 1L; - - protected transient Cluster cluster; - protected transient Session session; - - private final String insertQuery; - private transient PreparedStatement preparedStatement; - - private ClusterBuilder builder; - - private transient Object[] fields; - - protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { - super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); - this.insertQuery = insertQuery; - this.builder = builder; - ClosureCleaner.clean(builder, true); - } - - public void open() throws Exception { - super.open(); - if (!getRuntimeContext().isCheckpointingEnabled()) { - throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); - } - cluster = builder.getCluster(); - session = cluster.connect(); - preparedStatement = session.prepare(insertQuery); - - fields = new Object[((TupleSerializer<IN>) serializer).getArity()]; - } - - @Override - public void close() throws Exception { - super.close(); - try { - if (session != null) { - session.close(); - } - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - try { - if (cluster != null) { - cluster.close(); - } - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } - - @Override - protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception { - final AtomicInteger updatesCount = new AtomicInteger(0); - final AtomicInteger updatesConfirmed = new AtomicInteger(0); - - final AtomicReference<Throwable> exception = new AtomicReference<>(); - - FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() { - @Override - public void onSuccess(ResultSet resultSet) { - updatesConfirmed.incrementAndGet(); - if (updatesCount.get() > 0) { // only set if all updates have been sent - if (updatesCount.get() == updatesConfirmed.get()) { - synchronized (updatesConfirmed) { - updatesConfirmed.notifyAll(); - } - } - } - } - - @Override - public void onFailure(Throwable throwable) { - if (exception.compareAndSet(null, throwable)) { - LOG.error("Error while sending value.", throwable); - synchronized (updatesConfirmed) { - updatesConfirmed.notifyAll(); - } - } - } - }; - - //set values for prepared statement - int updatesSent = 0; - for (IN value : values) { - for (int x = 0; x < value.getArity(); x++) { - fields[x] = value.getField(x); - } - //insert values and send to cassandra - BoundStatement s = preparedStatement.bind(fields); - s.setDefaultTimestamp(timestamp); - ResultSetFuture result = session.executeAsync(s); - updatesSent++; - if (result != null) { - //add callback to detect errors - Futures.addCallback(result, callback); - } - } - updatesCount.set(updatesSent); - - synchronized (updatesConfirmed) { - while (exception.get() == null && updatesSent != updatesConfirmed.get()) { - updatesConfirmed.wait(); - } - } - - if (exception.get() != null) { - LOG.warn("Sending a value failed.", exception.get()); - return false; - } else { - return true; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java deleted file mode 100644 index 9fd3b4e..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.Cluster; - -import java.io.Serializable; - -/** - * This class is used to configure a {@link com.datastax.driver.core.Cluster} after deployment. - * The cluster represents the connection that will be established to Cassandra. - */ -public abstract class ClusterBuilder implements Serializable { - - public Cluster getCluster() { - return buildCluster(Cluster.builder()); - } - - /** - * Configures the connection to Cassandra. - * The configuration is done by calling methods on the builder object - * and finalizing the configuration with build(). - * - * @param builder connection builder - * @return configured connection - */ - protected abstract Cluster buildCluster(Cluster.Builder builder); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java deleted file mode 100644 index e66b8b3..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.batch.connectors.cassandra.example; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Cluster.Builder; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; -import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; -import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; - -import java.util.ArrayList; - -/** - * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API. - * - * The example assumes that a table exists in a local cassandra database, according to the following query: - * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings)); - */ -public class BatchExample { - private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; - private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; - - /* - * table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" - */ - public static void main(String[] args) throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - ArrayList<Tuple2<Integer, String>> collection = new ArrayList<>(20); - for (int i = 0; i < 20; i++) { - collection.add(new Tuple2<>(i, "string " + i)); - } - - DataSet<Tuple2<Integer, String>> dataSet = env.fromCollection(collection); - - dataSet.output(new CassandraOutputFormat<Tuple2<Integer, String>>(INSERT_QUERY, new ClusterBuilder() { - @Override - protected Cluster buildCluster(Builder builder) { - return builder.addContactPoints("127.0.0.1").build(); - } - })); - - env.execute("Write"); - - DataSet<Tuple2<Integer, String>> inputDS = env - .createInput(new CassandraInputFormat<Tuple2<Integer, String>>(SELECT_QUERY, new ClusterBuilder() { - @Override - protected Cluster buildCluster(Builder builder) { - return builder.addContactPoints("127.0.0.1").build(); - } - }), TupleTypeInfo.of(new TypeHint<Tuple2<Integer, String>>() { - })); - - inputDS.print(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java deleted file mode 100644 index 2bb6fd1..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; - -import org.apache.cassandra.service.CassandraDaemon; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; -import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.TestEnvironment; - -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Scanner; -import java.util.UUID; - -import static org.junit.Assert.*; - -@SuppressWarnings("serial") -public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { - - private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); - private static File tmpDir; - - private static final boolean EMBEDDED = true; - - private static EmbeddedCassandraService cassandra; - - private static ClusterBuilder builder = new ClusterBuilder() { - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - return builder - .addContactPoint("127.0.0.1") - .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) - .withoutJMXReporting() - .withoutMetrics().build(); - } - }; - - private static Cluster cluster; - private static Session session; - - private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; - private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; - private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; - private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; - private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; - - private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); - - static { - for (int i = 0; i < 20; i++) { - collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); - } - } - - private static class EmbeddedCassandraService { - CassandraDaemon cassandraDaemon; - - public void start() throws IOException { - this.cassandraDaemon = new CassandraDaemon(); - this.cassandraDaemon.init(null); - this.cassandraDaemon.start(); - } - - public void stop() { - this.cassandraDaemon.stop(); - } - } - - private static LocalFlinkMiniCluster flinkCluster; - - // ------------------------------------------------------------------------ - // Cluster Setup (Cassandra & Flink) - // ------------------------------------------------------------------------ - - @BeforeClass - public static void startCassandra() throws IOException { - - // check if we should run this test, current Cassandra version requires Java >= 1.8 - org.apache.flink.core.testutils.CommonTestUtils.assumeJava8(); - - // generate temporary files - tmpDir = CommonTestUtils.createTempDirectory(); - ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); - File file = new File(classLoader.getResource("cassandra.yaml").getFile()); - File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); - - assertTrue(tmp.createNewFile()); - - try ( - BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); - - //copy cassandra.yaml; inject absolute paths into cassandra.yaml - Scanner scanner = new Scanner(file); - ) { - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); - line = line.replace("$PATH", "'" + tmp.getParentFile()); - b.write(line + "\n"); - b.flush(); - } - } - - - // Tell cassandra where the configuration files are. - // Use the test configuration file. - System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); - - if (EMBEDDED) { - cassandra = new EmbeddedCassandraService(); - cassandra.start(); - } - - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up - } - - cluster = builder.getCluster(); - session = cluster.connect(); - - session.execute(CREATE_KEYSPACE_QUERY); - session.execute(CREATE_TABLE_QUERY); - } - - @BeforeClass - public static void startFlink() throws Exception { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - - flinkCluster = new LocalFlinkMiniCluster(config); - flinkCluster.start(); - } - - @AfterClass - public static void stopFlink() { - if (flinkCluster != null) { - flinkCluster.stop(); - flinkCluster = null; - } - } - - @AfterClass - public static void closeCassandra() { - if (session != null) { - session.executeAsync(DROP_KEYSPACE_QUERY); - session.close(); - } - - if (cluster != null) { - cluster.close(); - } - - if (cassandra != null) { - cassandra.stop(); - } - - if (tmpDir != null) { - //noinspection ResultOfMethodCallIgnored - tmpDir.delete(); - } - } - - // ------------------------------------------------------------------------ - // Test preparation & cleanup - // ------------------------------------------------------------------------ - - @Before - public void initializeExecutionEnvironment() { - TestStreamEnvironment.setAsContext(flinkCluster, 4); - new TestEnvironment(flinkCluster, 4, false).setAsContext(); - } - - @After - public void deleteSchema() throws Exception { - session.executeAsync(CLEAR_TABLE_QUERY); - } - - // ------------------------------------------------------------------------ - // Exactly-once Tests - // ------------------------------------------------------------------------ - - @Override - protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception { - return new CassandraTupleWriteAheadSink<>( - INSERT_DATA_QUERY, - TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), - builder, - new CassandraCommitter(builder)); - } - - @Override - protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() { - return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class); - } - - @Override - protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) { - return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID); - } - - @Override - protected void verifyResultsIdealCircumstances( - OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); - ArrayList<Integer> list = new ArrayList<>(); - for (int x = 1; x <= 60; x++) { - list.add(x); - } - - for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); - } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); - } - - @Override - protected void verifyResultsDataPersistenceUponMissedNotify( - OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); - ArrayList<Integer> list = new ArrayList<>(); - for (int x = 1; x <= 60; x++) { - list.add(x); - } - - for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); - } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); - } - - @Override - protected void verifyResultsDataDiscardingUponRestore( - OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); - ArrayList<Integer> list = new ArrayList<>(); - for (int x = 1; x <= 20; x++) { - list.add(x); - } - for (int x = 41; x <= 60; x++) { - list.add(x); - } - - for (Row s : result) { - list.remove(new Integer(s.getInt("counter"))); - } - Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); - } - - @Test - public void testCassandraCommitter() throws Exception { - CassandraCommitter cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); - cc1.setOperatorId("operator"); - - CassandraCommitter cc2 = new CassandraCommitter(builder); - cc2.setJobId("job"); - cc2.setOperatorId("operator"); - - CassandraCommitter cc3 = new CassandraCommitter(builder); - cc3.setJobId("job"); - cc3.setOperatorId("operator1"); - - cc1.createResource(); - - cc1.open(); - cc2.open(); - cc3.open(); - - Assert.assertFalse(cc1.isCheckpointCommitted(0, 1)); - Assert.assertFalse(cc2.isCheckpointCommitted(1, 1)); - Assert.assertFalse(cc3.isCheckpointCommitted(0, 1)); - - cc1.commitCheckpoint(0, 1); - Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); - //verify that other sub-tasks aren't affected - Assert.assertFalse(cc2.isCheckpointCommitted(1, 1)); - //verify that other tasks aren't affected - Assert.assertFalse(cc3.isCheckpointCommitted(0, 1)); - - Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); - - cc1.close(); - cc2.close(); - cc3.close(); - - cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); - cc1.setOperatorId("operator"); - - cc1.open(); - - //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data - Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); - Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); - - cc1.close(); - } - - // ------------------------------------------------------------------------ - // At-least-once Tests - // ------------------------------------------------------------------------ - - @Test - public void testCassandraTupleAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection); - source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); - - env.execute(); - - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); - } - - @Test - public void testCassandraPojoAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStreamSource<Pojo> source = env - .addSource(new SourceFunction<Pojo>() { - - private boolean running = true; - private volatile int cnt = 0; - - @Override - public void run(SourceContext<Pojo> ctx) throws Exception { - while (running) { - ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); - cnt++; - if (cnt == 20) { - cancel(); - } - } - } - - @Override - public void cancel() { - running = false; - } - }); - - source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); - - env.execute(); - - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); - } - - @Test - public void testCassandraBatchFormats() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection); - dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder)); - - env.execute("Write data"); - - DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( - new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), - TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); - - - long count = inputDS.count(); - Assert.assertEquals(count, 20L); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java deleted file mode 100644 index 847d1a0..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Session; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.Collections; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertFalse; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.powermock.api.mockito.PowerMockito.doAnswer; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; - -public class CassandraTupleWriteAheadSinkTest { - - @Test(timeout=20000) - public void testAckLoopExitOnException() throws Exception { - final AtomicReference<Runnable> runnableFuture = new AtomicReference<>(); - - final ClusterBuilder clusterBuilder = new ClusterBuilder() { - private static final long serialVersionUID = 4624400760492936756L; - - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - try { - BoundStatement boundStatement = mock(BoundStatement.class); - when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement); - - PreparedStatement preparedStatement = mock(PreparedStatement.class); - when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement); - - ResultSetFuture future = mock(ResultSetFuture.class); - when(future.get()).thenThrow(new RuntimeException("Expected exception.")); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - synchronized (runnableFuture) { - runnableFuture.set((((Runnable) invocationOnMock.getArguments()[0]))); - runnableFuture.notifyAll(); - } - return null; - } - }).when(future).addListener(any(Runnable.class), any(Executor.class)); - - Session session = mock(Session.class); - when(session.prepare(anyString())).thenReturn(preparedStatement); - when(session.executeAsync(any(BoundStatement.class))).thenReturn(future); - - Cluster cluster = mock(Cluster.class); - when(cluster.connect()).thenReturn(session); - return cluster; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - // Our asynchronous executor thread - new Thread(new Runnable() { - @Override - public void run() { - synchronized (runnableFuture) { - while (runnableFuture.get() == null) { - try { - runnableFuture.wait(); - } catch (InterruptedException e) { - // ignore interrupts - } - } - } - runnableFuture.get().run(); - } - }).start(); - - CheckpointCommitter cc = mock(CheckpointCommitter.class); - final CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>( - "abc", - TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()), - clusterBuilder, - cc - ); - - OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink); - harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true); - - harness.setup(); - sink.open(); - - // we should leave the loop and return false since we've seen an exception - assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 0L)); - - sink.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java deleted file mode 100644 index 9b331d6..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.mapping.annotations.Column; -import com.datastax.driver.mapping.annotations.Table; - -import java.io.Serializable; - -@Table(keyspace = "flink", name = "test") -public class Pojo implements Serializable { - - private static final long serialVersionUID = 1038054554690916991L; - - @Column(name = "id") - private String id; - @Column(name = "counter") - private int counter; - @Column(name = "batch_id") - private int batch_id; - - public Pojo(String id, int counter, int batch_id) { - this.id = id; - this.counter = counter; - this.batch_id = batch_id; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public int getCounter() { - return counter; - } - - public void setCounter(int counter) { - this.counter = counter; - } - - public int getBatch_id() { - return batch_id; - } - - public void setBatch_id(int batch_id) { - this.batch_id = batch_id; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java deleted file mode 100644 index e1bcea9..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.cassandra.example; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Cluster.Builder; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.cassandra.CassandraSink; -import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; - -import java.util.ArrayList; - -/** - * This is an example showing the to use the Pojo Cassandra Sink in the Streaming API. - * - * Pojo's have to be annotated with datastax annotations to work with this sink. - * - * The example assumes that a table exists in a local cassandra database, according to the following query: - * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY) - */ -public class CassandraPojoSinkExample { - private static final ArrayList<Message> messages = new ArrayList<>(20); - - static { - for (long i = 0; i < 20; i++) { - messages.add(new Message("cassandra-" + i)); - } - } - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Message> source = env.fromCollection(messages); - - CassandraSink.addSink(source) - .setClusterBuilder(new ClusterBuilder() { - @Override - protected Cluster buildCluster(Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); - } - }) - .build(); - - env.execute("Cassandra Sink example"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java deleted file mode 100644 index c6345df..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra.example; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Cluster.Builder; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.cassandra.CassandraSink; -import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; - -import java.util.ArrayList; - -/** - * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API. - * - * The example assumes that a table exists in a local cassandra database, according to the following query: - * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int) - */ -public class CassandraTupleSinkExample { - private static final String INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"; - private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20); - - static { - for (int i = 0; i < 20; i++) { - collection.add(new Tuple2<>("cassandra-" + i, i)); - } - } - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection); - - CassandraSink.addSink(source) - .setQuery(INSERT) - .setClusterBuilder(new ClusterBuilder() { - @Override - protected Cluster buildCluster(Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); - } - }) - .build(); - - env.execute("WriteTupleIntoCassandra"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java deleted file mode 100644 index 811c410..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra.example; - -import com.datastax.driver.core.Cluster; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.cassandra.CassandraSink; -import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; - -import java.util.UUID; - -/** - * This is an example showing the to use the Cassandra Sink (with write-ahead log) in the Streaming API. - * - * The example assumes that a table exists in a local cassandra database, according to the following query: - * CREATE TABLE example.values (id text, count int, PRIMARY KEY(id)); - * - * Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call - * when creating the CassandraSink. - */ -public class CassandraTupleWriteAheadSinkExample { - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - env.enableCheckpointing(1000); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000)); - env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend")); - - CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource())) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .enableWriteAheadLog() - .setClusterBuilder(new ClusterBuilder() { - @Override - public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); - } - }) - .build(); - - sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello"); - - env.execute(); - } - - public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> { - private int counter = 0; - private boolean stop = false; - - @Override - public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { - while (!stop) { - Thread.sleep(50); - ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1)); - counter++; - if (counter == 100) { - stop = true; - } - } - } - - @Override - public void cancel() { - stop = true; - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return counter; - } - - @Override - public void restoreState(Integer state) throws Exception { - this.counter = state; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java deleted file mode 100644 index 7524d95..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra.example; - -import com.datastax.driver.mapping.annotations.Column; -import com.datastax.driver.mapping.annotations.Table; - -import java.io.Serializable; - -@Table(keyspace = "test", name = "message") -public class Message implements Serializable { - - private static final long serialVersionUID = 1123119384361005680L; - - @Column(name = "body") - private String message; - - public Message(String word) { - this.message = word; - } - - public String getMessage() { - return message; - } - - public void setMessage(String word) { - this.message = word; - } - - public boolean equals(Object other) { - if (other instanceof Message) { - Message that = (Message) other; - return this.message.equals(that.message); - } - return false; - } - - @Override - public int hashCode() { - return message.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml deleted file mode 100644 index 0594ea3..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml +++ /dev/null @@ -1,43 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -cluster_name: 'Test Cluster' -commitlog_sync: 'periodic' -commitlog_sync_period_in_ms: 10000 -commitlog_segment_size_in_mb: 16 -partitioner: 'org.apache.cassandra.dht.RandomPartitioner' -endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' -commitlog_directory: $PATH/commit' -data_file_directories: - - $PATH/data' -saved_caches_directory: $PATH/cache' -listen_address: '127.0.0.1' -seed_provider: - - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider' - parameters: - - seeds: '127.0.0.1' -native_transport_port: 9042 - -concurrent_reads: 8 -concurrent_writes: 8 - -auto_bootstrap: false -auto_snapshot: false - -start_rpc: false -start_native_transport: true -native_transport_max_threads: 8 http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties deleted file mode 100644 index a43d556..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target= System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml deleted file mode 100644 index c5ba3d8..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml +++ /dev/null @@ -1,90 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-elasticsearch_2.10</artifactId> - <name>flink-connector-elasticsearch</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <elasticsearch.version>1.7.1</elasticsearch.version> - </properties> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.version}</version> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <rerunFailingTestsCount>3</rerunFailingTestsCount> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java deleted file mode 100644 index ac14ade..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch; - -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.ImmutableList; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.node.Node; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.node.NodeBuilder.nodeBuilder; - - -/** - * Sink that emits its input elements to an Elasticsearch cluster. - * - * <p> - * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} - * the sink will create a local {@link Node} for communicating with the - * Elasticsearch cluster. When using the second constructor - * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will - * be used instead. - * - * <p> - * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster - * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster - * to come online. - * - * <p> - * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating - * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch - * documentation. An important setting is {@code cluster.name}, this should be set to the name - * of the cluster that the sink should emit to. - * - * <p> - * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. - * This will buffer elements before sending a request to the cluster. The behaviour of the - * {@code BulkProcessor} can be configured using these config keys: - * <ul> - * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer - * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer - * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two - * settings in milliseconds - * </ul> - * - * <p> - * You also have to provide an {@link IndexRequestBuilder}. This is used to create an - * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See - * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example. - * - * @param <T> Type of the elements emitted by this sink - */ -public class ElasticsearchSink<T> extends RichSinkFunction<T> { - - public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; - public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; - public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); - - /** - * The user specified config map that we forward to Elasticsearch when we create the Client. - */ - private final Map<String, String> userConfig; - - /** - * The list of nodes that the TransportClient should connect to. This is null if we are using - * an embedded Node to get a Client. - */ - private final List<TransportAddress> transportNodes; - - /** - * The builder that is used to construct an {@link IndexRequest} from the incoming element. - */ - private final IndexRequestBuilder<T> indexRequestBuilder; - - /** - * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null - * if we are using a TransportClient. - */ - private transient Node node; - - /** - * The Client that was either retrieved from a Node or is a TransportClient. - */ - private transient Client client; - - /** - * Bulk processor that was created using the client - */ - private transient BulkProcessor bulkProcessor; - - /** - * This is set from inside the BulkProcessor listener if there where failures in processing. - */ - private final AtomicBoolean hasFailure = new AtomicBoolean(false); - - /** - * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. - */ - private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); - - /** - * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node. - * - * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor - * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element - */ - public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) { - this.userConfig = userConfig; - this.indexRequestBuilder = indexRequestBuilder; - transportNodes = null; - } - - /** - * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. - * - * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor - * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient} - * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element - * - */ - public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) { - this.userConfig = userConfig; - this.indexRequestBuilder = indexRequestBuilder; - this.transportNodes = transportNodes; - } - - /** - * Initializes the connection to Elasticsearch by either creating an embedded - * {@link org.elasticsearch.node.Node} and retrieving the - * {@link org.elasticsearch.client.Client} from it or by creating a - * {@link org.elasticsearch.client.transport.TransportClient}. - */ - @Override - public void open(Configuration configuration) { - if (transportNodes == null) { - // Make sure that we disable http access to our embedded node - Settings settings = - ImmutableSettings.settingsBuilder() - .put(userConfig) - .put("http.enabled", false) - .build(); - - node = - nodeBuilder() - .settings(settings) - .client(true) - .data(false) - .node(); - - client = node.client(); - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch Client {} from embedded Node", client); - } - - } else { - Settings settings = ImmutableSettings.settingsBuilder() - .put(userConfig) - .build(); - - TransportClient transportClient = new TransportClient(settings); - for (TransportAddress transport: transportNodes) { - transportClient.addTransportAddress(transport); - } - - // verify that we actually are connected to a cluster - ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes(); - if (nodes.isEmpty()) { - throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Connected to nodes: " + nodes.toString()); - } - } - - client = transportClient; - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch TransportClient {}", client); - } - } - - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( - client, - new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, - BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, - BulkRequest request, - BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse itemResp : response.getItems()) { - if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); - } - } - hasFailure.set(true); - } - } - - @Override - public void afterBulk(long executionId, - BulkRequest request, - Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); - } - }); - - // This makes flush() blocking - bulkProcessorBuilder.setConcurrentRequests(0); - - ParameterTool params = ParameterTool.fromMap(userConfig); - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { - bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { - bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( - CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { - bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); - } - - bulkProcessor = bulkProcessorBuilder.build(); - } - - @Override - public void invoke(T element) { - IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Emitting IndexRequest: {}", indexRequest); - } - - bulkProcessor.add(indexRequest); - } - - @Override - public void close() { - if (bulkProcessor != null) { - bulkProcessor.close(); - bulkProcessor = null; - } - - if (client != null) { - client.close(); - } - - if (node != null) { - node.close(); - } - - if (hasFailure.get()) { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occured in ElasticsearchSink.", cause); - } else { - throw new RuntimeException("An error occured in ElasticsearchSink."); - - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java deleted file mode 100644 index 04ae40a..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.elasticsearch.action.index.IndexRequest; - -import java.io.Serializable; - -/** - * Function that creates an {@link IndexRequest} from an element in a Stream. - * - * <p> - * This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink} - * to prepare elements for sending them to Elasticsearch. See - * <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html">Index API</a> - * for information about how to format data for adding it to an Elasticsearch index. - * - * <p> - * Example: - * - * <pre>{@code - * private static class MyIndexRequestBuilder implements IndexRequestBuilder<String> { - * - * public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { - * Map<String, Object> json = new HashMap<>(); - * json.put("data", element); - * - * return Requests.indexRequest() - * .index("my-index") - * .type("my-type") - * .source(json); - * } - * } - * }</pre> - * - * @param <T> The type of the element handled by this {@code IndexRequestBuilder} - */ -public interface IndexRequestBuilder<T> extends Function, Serializable { - - /** - * Creates an {@link org.elasticsearch.action.index.IndexRequest} from an element. - * - * @param element The element that needs to be turned in to an {@code IndexRequest} - * @param ctx The Flink {@link RuntimeContext} of the {@link ElasticsearchSink} - * - * @return The constructed {@code IndexRequest} - */ - IndexRequest createIndexRequest(T element, RuntimeContext ctx); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java deleted file mode 100644 index 33a2e47..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.node.Node; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.node.NodeBuilder.nodeBuilder; - -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void testNodeClient() throws Exception{ - - File dataDir = tempFolder.newFolder(); - - Node node = nodeBuilder() - .settings(ImmutableSettings.settingsBuilder() - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-node-client-cluster") - .local(true) - .node(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = Maps.newHashMap(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - // connect to our local node - config.put("node.local", "true"); - - source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder())); - - env.execute("Elasticsearch Node Client Test"); - - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", - Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test - public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = nodeBuilder() - .settings(ImmutableSettings.settingsBuilder() - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-node-client-cluster") - .local(true) - .node(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = Maps.newHashMap(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - // connect to our local node - config.put("node.local", "true"); - - List<TransportAddress> transports = Lists.newArrayList(); - transports.add(new LocalTransportAddress("1")); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); - - env.execute("Elasticsearch TransportClient Test"); - - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", - Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test(expected = JobExecutionException.class) - public void testTransportClientFails() throws Exception{ - // this checks whether the TransportClient fails early when there is no cluster to - // connect to. We don't hava such as test for the Node Client version since that - // one will block and wait for a cluster to come online - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = Maps.newHashMap(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - // connect to our local node - config.put("node.local", "true"); - - List<TransportAddress> transports = Lists.newArrayList(); - transports.add(new LocalTransportAddress("1")); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); - - env.execute("Elasticsearch Node Client Test"); - } - - private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { - for (int i = 0; i < NUM_ELEMENTS && running; i++) { - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; - } - } - - private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - @Override - public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element.f1); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element.f0.toString()) - .source(json); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java deleted file mode 100644 index 136ae77..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch.examples; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the cluster name in the config map. - */ -public class ElasticsearchExample { - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws Exception { - for (int i = 0; i < 20 && running; i++) { - ctx.collect("message #" + i); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { - @Override - public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .source(json); - } - })); - - - env.execute("Elasticsearch Example"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties deleted file mode 100644 index dc20726..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming" level="WARN"/> -</configuration> \ No newline at end of file
