Repository: incubator-streams Updated Branches: refs/heads/master cee5a231a -> e40e6287e
[STREAMS-478] streams persist reader/writer for Apache Cassandra Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e40e6287 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e40e6287 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e40e6287 Branch: refs/heads/master Commit: e40e6287e5deeb51af9dcd4af9fb51e91d582373 Parents: cee5a23 Author: Subhobrata Dey <sbc...@gmail.com> Authored: Fri Dec 23 22:10:46 2016 -0500 Committer: Subhobrata Dey <sbc...@gmail.com> Committed: Wed Dec 28 19:58:33 2016 -0500 ---------------------------------------------------------------------- streams-contrib/pom.xml | 1 + .../streams-persist-cassandra/README.md | 8 + .../streams-persist-cassandra/pom.xml | 231 +++++++++++++ .../cassandra/CassandraPersistReader.java | 316 ++++++++++++++++++ .../cassandra/CassandraPersistWriter.java | 321 +++++++++++++++++++ .../cassandra/CassandraConfiguration.json | 47 +++ .../src/main/resources/cassandra.conf | 25 ++ .../src/main/resources/components.dot | 50 +++ .../src/site/markdown/cassandra.md | 36 +++ .../src/site/markdown/index.md | 23 ++ .../streams-persist-cassandra/src/site/site.xml | 25 ++ .../cassandra/test/CassandraPersistIT.java | 106 ++++++ .../src/test/resources/CassandraPersistIT.conf | 27 ++ 13 files changed, 1216 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 71b9a39..8408cef 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -38,6 +38,7 @@ <modules> <module>streams-persist-console</module> + <module>streams-persist-cassandra</module> <module>streams-persist-elasticsearch</module> <module>streams-persist-filebuffer</module> <module>streams-persist-hbase</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/README.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/README.md b/streams-contrib/streams-persist-cassandra/README.md new file mode 100644 index 0000000..cd5ca39 --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/README.md @@ -0,0 +1,8 @@ +Apache Streams (incubating) +Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 +-------------------------------------------------------------------------------- + +org.apache.streams:streams-persist-cassandra +=========================================== + +[README.md](src/site/markdown/index.md "README") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml new file mode 100644 index 0000000..f29b9e5 --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/pom.xml @@ -0,0 +1,231 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>streams-contrib</artifactId> + <groupId>org.apache.streams</groupId> + <version>0.5-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>streams-persist-cassandra</artifactId> + <name>${project.artifactId}</name> + + <description>Cassandra Module</description> + + <properties> + <cassandra-driver.version>3.1.2</cassandra-driver.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra-driver.version}</version> + </dependency> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-mapping</artifactId> + <version>${cassandra-driver.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-schema-activitystreams</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-testing</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.streams.plugins</groupId> + <artifactId>streams-plugin-pojo</artifactId> + <version>${project.version}</version> + <configuration> + <sourcePaths> + <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath> + </sourcePaths> + <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory> + <targetPackage>org.apache.streams.cassandra.pojo</targetPackage> + </configuration> + <executions> + <execution> + <goals> + <goal>generate-sources</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/pojo</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>resource-dependencies</id> + <phase>process-test-resources</phase> + <goals> + <goal>unpack-dependencies</goal> + </goals> + <configuration> + <includeArtifactIds>streams-schema-activitystreams</includeArtifactIds> + <includes>**/*.json</includes> + <outputDirectory>${project.build.directory}/test-classes</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${failsafe.plugin.version}</version> + <configuration> + <!-- Run integration test suite rather than individual tests. --> + <excludes> + <exclude>**/*Test.java</exclude> + <exclude>**/*Tests.java</exclude> + </excludes> + <includes> + <include>**/*IT.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <includes> + <include>**/*.conf</include> + <include>**/*.json</include> + <include>**/*.class</include> + </includes> + </configuration> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>dockerITs</id> + <activation> + <activeByDefault>false</activeByDefault> + <property> + <name>skipITs</name> + <value>false</value> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + <configuration combine.self="override"> + <watchInterval>500</watchInterval> + <logDate>default</logDate> + <verbose>true</verbose> + <autoPull>on</autoPull> + <images> + <image> + <name>cassandra:3.9</name> + <alias>cassandra</alias> + <run> + <namingStrategy>none</namingStrategy> + <ports> + <port>${cassandra.tcp.host}:${cassandra.tcp.port}:9042</port> + </ports> + <portPropertyFile>cassandra.properties</portPropertyFile> + <log> + <enabled>true</enabled> + <date>default</date> + <color>cyan</color> + </log> + </run> + <watch> + <mode>none</mode> + </watch> + </image> + </images> + </configuration> + + </plugin> + + </plugins> + </build> + + </profile> + </profiles> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java new file mode 100644 index 0000000..aaa40fe --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.cassandra; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Queues; +import org.apache.commons.lang3.StringUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * CassandraPersistReader reads documents from cassandra. + */ +public class CassandraPersistReader implements StreamsPersistReader { + + public static final String STREAMS_ID = "CassandraPersistReader"; + + public static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistReader.class); + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + private ExecutorService executor; + private CompletableFuture<Boolean> readerTaskFuture = new CompletableFuture<>(); + + private CassandraConfiguration config; + + protected Cluster cluster; + protected Session session; + + protected String keyspace; + protected String table; + protected Iterator<Row> rowIterator; + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * CassandraPersistReader constructor - resolves CassandraConfiguration from JVM 'cassandra'. + */ + public CassandraPersistReader() { + this.config = new ComponentConfigurator<>(CassandraConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra")); + } + + /** + * CassandraPersistReader constructor - uses supplied CassandraConfiguration. + * @param config config + */ + public CassandraPersistReader(CassandraConfiguration config) { + this.config = config; + } + + /** + * CassandraPersistReader constructor - uses supplied persistQueue. + * @param persistQueue persistQueue + */ + public CassandraPersistReader(Queue<StreamsDatum> persistQueue) { + this.config = new ComponentConfigurator<>(CassandraConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra")); + this.persistQueue = persistQueue; + } + + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } + + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } + + public void stop() { + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + connectToCassandra(); + + String selectStatement = getSelectStatement(); + ResultSet rs = session.execute(selectStatement); + rowIterator = rs.iterator(); + + if (!rowIterator.hasNext()) { + throw new RuntimeException("Table" + table + "is empty!"); + } + + persistQueue = constructQueue(); + + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public void cleanUp() { + stop(); + } + + protected StreamsDatum prepareDatum(Row row) { + ObjectNode objectNode; + + try { + byte[] value = row.getBytes(config.getColumn()).array(); + objectNode = mapper.readValue(value, ObjectNode.class); + } catch (IOException ex) { + LOGGER.warn("document isn't valid JSON."); + return null; + } + + return new StreamsDatum(objectNode); + } + + private synchronized void connectToCassandra() { + Cluster.Builder clusterBuilder = Cluster.builder() + .addContactPoints(config.getHost().toArray(new String[config.getHost().size()])) + .withPort(config.getPort().intValue()); + + keyspace = config.getKeyspace(); + table = config.getTable(); + + if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) { + cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build(); + } else { + cluster = clusterBuilder.build(); + } + + Metadata metadata = cluster.getMetadata(); + if (Objects.isNull(metadata.getKeyspace(keyspace))) { + LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace); + Map<String, Object> replication = new HashMap<>(); + replication.put("class", "SimpleStrategy"); + replication.put("replication_factor", 1); + + String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with() + .replication(replication).getQueryString(); + cluster.connect().execute(createKeyspaceStmt); + } + + session = cluster.connect(keyspace); + + KeyspaceMetadata ks = metadata.getKeyspace(keyspace); + TableMetadata tableMetadata = ks.getTable(table); + + if (Objects.isNull(tableMetadata)) { + LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace); + String createTableStmt = SchemaBuilder.createTable(table) + .addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar()) + .addColumn(config.getColumn(), DataType.blob()).getQueryString(); + + session.execute(createTableStmt); + } + } + + @Override + public StreamsResultSet readAll() { + ResultSet rs = session.execute(getSelectStatement()); + Iterator<Row> rowsIterator = rs.iterator(); + + while (rowsIterator.hasNext()) { + Row row = rowsIterator.next(); + StreamsDatum datum = prepareDatum(row); + write(datum); + } + + return readCurrent(); + } + + @Override + public void startStream() { + LOGGER.debug("startStream"); + CassandraPersistReaderTask readerTask = new CassandraPersistReaderTask(this); + + CompletableFuture.runAsync(readerTask, executor); + + try { + if (readerTaskFuture.get()) { + executor.shutdown(); + } + } catch (InterruptedException ex) { + LOGGER.trace("Interrupt", ex); + } catch (ExecutionException ex) { + LOGGER.trace("Execution exception", ex); + } + } + + @Override + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + try { + lock.writeLock().lock(); + current = new StreamsResultSet(persistQueue); + current.setCounter(new DatumStatusCounter()); + persistQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); + } + + return current; + } + + protected void write(StreamsDatum entry) { + boolean success; + do { + try { + lock.readLock().lock(); + success = persistQueue.offer(entry); + Thread.yield(); + } finally { + lock.readLock().unlock(); + } + } + while (!success); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !executor.isTerminated() || !executor.isShutdown(); + } + + private Queue<StreamsDatum> constructQueue() { + return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); + } + + private String getSelectStatement() { + return QueryBuilder.select().all() + .from(table).getQueryString(); + } + + public class CassandraPersistReaderTask implements Runnable { + + private CassandraPersistReader reader; + + public CassandraPersistReaderTask(CassandraPersistReader reader) { + this.reader = reader; + } + + @Override + public void run() { + try { + while (reader.rowIterator.hasNext()) { + Row row = reader.rowIterator.next(); + StreamsDatum datum = reader.prepareDatum(row); + reader.write(datum); + } + } finally { + readerTaskFuture.complete(true); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java new file mode 100644 index 0000000..81c0e9e --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.cassandra; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.GuidUtils; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable { + + public static final String STREAMS_ID = "CassandraPersistWriter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistWriter.class); + + private static final long MAX_WRITE_LATENCY = 1000; + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); + private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + + private CassandraConfiguration config; + + protected Cluster cluster; + protected Session session; + + protected String keyspace; + protected String table; + protected PreparedStatement insertStatement; + + protected List<BoundStatement> insertBatch = new ArrayList<>(); + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public CassandraPersistWriter() { + this(new ComponentConfigurator<>(CassandraConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra"))); + } + + public CassandraPersistWriter(CassandraConfiguration config) { + this.config = config; + } + + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } + + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + ObjectNode node; + + if (streamsDatum.getDocument() instanceof String) { + try { + node = mapper.readValue((String) streamsDatum.getDocument(), ObjectNode.class); + + byte[] value = node.toString().getBytes(); + + String key = GuidUtils.generateGuid(node.toString()); + if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) { + key = streamsDatum.getMetadata().get("id").toString(); + } + + BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value)); + insertBatch.add(statement); + } catch (IOException ex) { + LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); + return; + } + } else { + try { + node = mapper.valueToTree(streamsDatum.getDocument()); + + byte[] value = node.toString().getBytes(); + + String key = GuidUtils.generateGuid(node.toString()); + if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) { + key = streamsDatum.getMetadata().get("id").toString(); + } + + BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value)); + insertBatch.add(statement); + } catch (Exception ex) { + LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); + return; + } + } + + flushIfNecessary(); + } + + @Override + public void flush() throws IOException { + try { + LOGGER.debug("Attempting to flush {} items to cassandra", insertBatch.size()); + lock.writeLock().lock(); + + BatchStatement batchStatement = new BatchStatement(); + batchStatement.addAll(insertBatch); + session.execute(batchStatement); + + lastWrite.set(System.currentTimeMillis()); + insertBatch = new ArrayList<>(); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public synchronized void close() throws IOException { + session.close(); + cluster.close(); + backgroundFlushTask.shutdownNow(); + } + + /** + * start write thread. + */ + public void start() { + connectToCassandra(); + backgroundFlushTask.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + flushIfNecessary(); + } + }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS); + } + + /** + * stop. + */ + public void stop() { + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error flushing", ex); + } + + try { + close(); + } catch (IOException ex) { + LOGGER.error("Error closing", ex); + } + + try { + backgroundFlushTask.shutdown(); + // Wait a while for existing tasks to terminate + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + LOGGER.error("Stream did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + backgroundFlushTask.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + while (true) { + if (persistQueue.peek() != null) { + try { + StreamsDatum entry = persistQueue.remove(); + write(entry); + } catch (Exception ex) { + LOGGER.warn("Failure writing entry from Queue: {}", ex.getMessage()); + } + } + try { + Thread.sleep(new Random().nextInt(1)); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupt", interrupt); + } + } + } + + @Override + public void prepare(Object configurationObject) { + this.persistQueue = new ConcurrentLinkedQueue<>(); + start(); + } + + @Override + public void cleanUp() { + stop(); + } + + protected void flushIfNecessary() { + long lastLatency = System.currentTimeMillis() - lastWrite.get(); + //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater + //than the maximum desired latency + if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) { + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error writing to Cassandra", ex); + } + } + } + + private synchronized void connectToCassandra() { + Cluster.Builder clusterBuilder = Cluster.builder() + .addContactPoints(config.getHost().toArray(new String[config.getHost().size()])) + .withPort(config.getPort().intValue()); + + keyspace = config.getKeyspace(); + table = config.getTable(); + + if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) { + cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build(); + } else { + cluster = clusterBuilder.build(); + } + + Metadata metadata = cluster.getMetadata(); + if (Objects.isNull(metadata.getKeyspace(keyspace))) { + LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace); + Map<String, Object> replication = new HashMap<>(); + replication.put("class", "SimpleStrategy"); + replication.put("replication_factor", 1); + + String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with() + .replication(replication).getQueryString(); + cluster.connect().execute(createKeyspaceStmt); + } + + session = cluster.connect(keyspace); + + KeyspaceMetadata ks = metadata.getKeyspace(keyspace); + TableMetadata tableMetadata = ks.getTable(table); + + if (Objects.isNull(tableMetadata)) { + LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace); + String createTableStmt = SchemaBuilder.createTable(table) + .addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar()) + .addColumn(config.getColumn(), DataType.blob()).getQueryString(); + + session.execute(createTableStmt); + } + + createInsertStatement(); + } + + private void createInsertStatement() { + Insert insertBuilder = QueryBuilder.insertInto(table); + insertBuilder.value(config.getPartitionKeyColumn(), new Object()); + insertBuilder.value(config.getColumn(), new Object()); + insertStatement = session.prepare(insertBuilder.getQueryString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json new file mode 100644 index 0000000..4b4cf1f --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json @@ -0,0 +1,47 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType": "org.apache.streams.cassandra.CassandraConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "host": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Cassandra host" + }, + "port": { + "type": "integer", + "description": "Cassandra port" + }, + "user": { + "type": "string", + "description": "User" + }, + "password": { + "type": "string", + "description": "Password" + }, + "keyspace": { + "type": "string", + "description": "Keyspace" + }, + "table": { + "type": "string", + "description": "Table" + }, + "partitionKeyColumn": { + "type": "string", + "description": "Partition Key column name" + }, + "column": { + "type": "string", + "description": "Column name" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf b/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf new file mode 100644 index 0000000..8576e9a --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cassandra { + "host": ["127.0.0.1"], + "port": 9042, + "keyspace": "test_keyspace", + "table": "test_table", + "partitionKeyColumn": "key", + "column": "value" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot b/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot new file mode 100644 index 0000000..916d8c0 --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +digraph g { + + graph [compound = true]; + + //presentation + splines = true; + overlap = false; + rankdir = TB; + + generators [label="generators", shape="circle"]; + providers [label="providers", shape="circle"]; + processors [label="processors", shape="circle"]; + + subgraph cluster_persisters { + label="persisters"; + persisters_cassandra_reader [label="CassandraPersistReader"] + persisters_cassandra_writer [label="CassandraPersistWriter"] + } + + subgraph cluster_dbs { + label="dbs"; + cassandra [label="cassandra", shape="cylinder"] + } + + generators -> providers + providers -> processors + processors -> persisters_cassandra_writer [label="StreamsDatum"] + persisters_cassandra_reader -> processors [label="StreamsDatum[String]"] + cassandra -> persisters_cassandra_reader + persisters_cassandra_writer -> cassandra + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md b/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md new file mode 100644 index 0000000..9d01112 --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md @@ -0,0 +1,36 @@ +## Cassandra + +Start cassandra via docker with the docker maven plugin: + + docker -PdockerITs docker:start + +Confirm that cassandra is running: + + docker ps + +Confirm that host and post(s) are in property file: + + cat cassandra.properties + +Create a local file `cassandra.conf` with cluster details: + + cassandra { + host = ${cassandra.tcp.host} + port = ${cassandra.tcp.port} + } + +When configuring a stream, include these files: + + include "cassandra.properties" + include "cassandra.conf" + +Supply application-specific configuration as well: + + cassandra { + keyspace = test_keyspace1 + table = test_table1 + partitionKeyColumn = key + column = value + } + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md b/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md new file mode 100644 index 0000000..353f1c4 --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md @@ -0,0 +1,23 @@ +streams-persist-cassandra +===================== + +Read/write to/from Cassandra + +## Configuration + +| Schema | +|--------| +| [CassandraConfiguration.json](../../../org/apache/streams/cassandra/CassandraConfiguration.json "CassandraConfiguration.json") [CassandraConfiguration.html](apidocs/org/apache/streams/cassandra/CassandraConfiguration.html "javadoc") | + +## Components + +![components](components.dot.svg "Components") + +| Class | +|-------| +| CassandraPersistReader [CassandraPersistReader.html](apidocs/org/apache/streams/cassandra/CassandraPersistReader.html "javadoc") +| CassandraPersistWriter [CassandraPersistWriter.html](apidocs/org/apache/streams/cassandra/CassandraPersistWriter.html "javadoc") + +[JavaDocs](apidocs/index.html "JavaDocs") + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/site.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/site/site.xml b/streams-contrib/streams-persist-cassandra/src/site/site.xml new file mode 100644 index 0000000..f82300a --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/site/site.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project> + <body> + <links name="Help"> + <item name="Cassandra" href="cassandra.html"/> + </links> + </body> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java new file mode 100644 index 0000000..ca675b9 --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.cassandra.test; + +import org.apache.streams.cassandra.CassandraConfiguration; +import org.apache.streams.cassandra.CassandraPersistReader; +import org.apache.streams.cassandra.CassandraPersistWriter; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Test writing documents + */ +public class CassandraPersistIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistIT.class); + + private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + private CassandraConfiguration testConfiguration; + + private int count = 0; + + @BeforeClass + public void setup() throws Exception { + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/CassandraPersistIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(CassandraConfiguration.class).detectConfiguration(typesafe, "cassandra"); + } + + @Test + public void testCassandraPersist() throws Exception { + CassandraPersistWriter writer = new CassandraPersistWriter(testConfiguration); + + writer.prepare(null); + + InputStream testActivityFolderStream = CassandraPersistIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8); + + for (String file: files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = CassandraPersistIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + activity.getAdditionalProperties().remove("$license"); + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + writer.write(datum); + + LOGGER.info("Wrote: " + activity.getVerb() ); + count++; + } + + LOGGER.info("Total Written: {}", count ); + Assert.assertEquals(89, count); + + writer.cleanUp(); + + CassandraPersistReader reader = new CassandraPersistReader(testConfiguration); + + reader.prepare(null); + + StreamsResultSet resultSet = reader.readAll(); + + LOGGER.info("Total Read: {}", resultSet.size() ); + Assert.assertEquals(89, resultSet.size()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf new file mode 100644 index 0000000..62fec7d --- /dev/null +++ b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cassandra { + host = [${cassandra.tcp.host}] + port = ${cassandra.tcp.port} + user = cassandra + password = cassandra + keyspace = test_keyspace + table = test_table + partitionKeyColumn = key + column = value +} \ No newline at end of file