Repository: incubator-streams
Updated Branches:
  refs/heads/master cee5a231a -> e40e6287e


[STREAMS-478] streams persist reader/writer for Apache Cassandra


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e40e6287
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e40e6287
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e40e6287

Branch: refs/heads/master
Commit: e40e6287e5deeb51af9dcd4af9fb51e91d582373
Parents: cee5a23
Author: Subhobrata Dey <sbc...@gmail.com>
Authored: Fri Dec 23 22:10:46 2016 -0500
Committer: Subhobrata Dey <sbc...@gmail.com>
Committed: Wed Dec 28 19:58:33 2016 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 .../streams-persist-cassandra/README.md         |   8 +
 .../streams-persist-cassandra/pom.xml           | 231 +++++++++++++
 .../cassandra/CassandraPersistReader.java       | 316 ++++++++++++++++++
 .../cassandra/CassandraPersistWriter.java       | 321 +++++++++++++++++++
 .../cassandra/CassandraConfiguration.json       |  47 +++
 .../src/main/resources/cassandra.conf           |  25 ++
 .../src/main/resources/components.dot           |  50 +++
 .../src/site/markdown/cassandra.md              |  36 +++
 .../src/site/markdown/index.md                  |  23 ++
 .../streams-persist-cassandra/src/site/site.xml |  25 ++
 .../cassandra/test/CassandraPersistIT.java      | 106 ++++++
 .../src/test/resources/CassandraPersistIT.conf  |  27 ++
 13 files changed, 1216 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 71b9a39..8408cef 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -38,6 +38,7 @@
 
     <modules>
         <module>streams-persist-console</module>
+        <module>streams-persist-cassandra</module>
         <module>streams-persist-elasticsearch</module>
         <module>streams-persist-filebuffer</module>
         <module>streams-persist-hbase</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/README.md 
b/streams-contrib/streams-persist-cassandra/README.md
new file mode 100644
index 0000000..cd5ca39
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/README.md
@@ -0,0 +1,8 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+org.apache.streams:streams-persist-cassandra
+===========================================
+
+[README.md](src/site/markdown/index.md "README")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml 
b/streams-contrib/streams-persist-cassandra/pom.xml
new file mode 100644
index 0000000..f29b9e5
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.5-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-cassandra</artifactId>
+    <name>${project.artifactId}</name>
+
+    <description>Cassandra Module</description>
+
+    <properties>
+        <cassandra-driver.version>3.1.2</cassandra-driver.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+            <version>${cassandra-driver.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-mapping</artifactId>
+            <version>${cassandra-driver.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-schema-activitystreams</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.streams.plugins</groupId>
+                <artifactId>streams-plugin-pojo</artifactId>
+                <version>${project.version}</version>
+                <configuration>
+                    <sourcePaths>
+                        
<sourcePath>${project.basedir}/src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    
<targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>
+                    
<targetPackage>org.apache.streams.cassandra.pojo</targetPackage>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            
<includeArtifactIds>streams-schema-activitystreams</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            
<outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>${failsafe.plugin.version}</version>
+                <configuration>
+                    <!-- Run integration test suite rather than individual 
tests. -->
+                    <excludes>
+                        <exclude>**/*Test.java</exclude>
+                        <exclude>**/*Tests.java</exclude>
+                    </excludes>
+                    <includes>
+                        <include>**/*IT.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <includes>
+                        <include>**/*.conf</include>
+                        <include>**/*.json</include>
+                        <include>**/*.class</include>
+                    </includes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dockerITs</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>skipITs</name>
+                    <value>false</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>io.fabric8</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <configuration combine.self="override">
+                            <watchInterval>500</watchInterval>
+                            <logDate>default</logDate>
+                            <verbose>true</verbose>
+                            <autoPull>on</autoPull>
+                            <images>
+                                <image>
+                                    <name>cassandra:3.9</name>
+                                    <alias>cassandra</alias>
+                                    <run>
+                                        <namingStrategy>none</namingStrategy>
+                                        <ports>
+                                            
<port>${cassandra.tcp.host}:${cassandra.tcp.port}:9042</port>
+                                        </ports>
+                                        
<portPropertyFile>cassandra.properties</portPropertyFile>
+                                        <log>
+                                            <enabled>true</enabled>
+                                            <date>default</date>
+                                            <color>cyan</color>
+                                        </log>
+                                    </run>
+                                    <watch>
+                                        <mode>none</mode>
+                                    </watch>
+                                </image>
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
new file mode 100644
index 0000000..aaa40fe
--- /dev/null
+++ 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Queues;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * CassandraPersistReader reads documents from cassandra.
+ */
+public class CassandraPersistReader implements StreamsPersistReader {
+
+  public static final String STREAMS_ID = "CassandraPersistReader";
+
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraPersistReader.class);
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  private ExecutorService executor;
+  private CompletableFuture<Boolean> readerTaskFuture = new 
CompletableFuture<>();
+
+  private CassandraConfiguration config;
+
+  protected Cluster cluster;
+  protected Session session;
+
+  protected String keyspace;
+  protected String table;
+  protected Iterator<Row> rowIterator;
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  /**
+   * CassandraPersistReader constructor - resolves CassandraConfiguration from 
JVM 'cassandra'.
+   */
+  public CassandraPersistReader() {
+    this.config = new ComponentConfigurator<>(CassandraConfiguration.class)
+      
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra"));
+  }
+
+  /**
+   * CassandraPersistReader constructor - uses supplied CassandraConfiguration.
+   * @param config config
+   */
+  public CassandraPersistReader(CassandraConfiguration config) {
+    this.config = config;
+  }
+
+  /**
+   * CassandraPersistReader constructor - uses supplied persistQueue.
+   * @param persistQueue persistQueue
+   */
+  public CassandraPersistReader(Queue<StreamsDatum> persistQueue) {
+    this.config = new ComponentConfigurator<>(CassandraConfiguration.class)
+      
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra"));
+    this.persistQueue = persistQueue;
+  }
+
+  public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+    this.persistQueue = persistQueue;
+  }
+
+  public Queue<StreamsDatum> getPersistQueue() {
+    return persistQueue;
+  }
+
+  public void stop() {
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    connectToCassandra();
+
+    String selectStatement = getSelectStatement();
+    ResultSet rs = session.execute(selectStatement);
+    rowIterator = rs.iterator();
+
+    if (!rowIterator.hasNext()) {
+      throw new RuntimeException("Table" + table + "is empty!");
+    }
+
+    persistQueue = constructQueue();
+
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public void cleanUp() {
+    stop();
+  }
+
+  protected StreamsDatum prepareDatum(Row row) {
+    ObjectNode objectNode;
+
+    try {
+      byte[] value = row.getBytes(config.getColumn()).array();
+      objectNode = mapper.readValue(value, ObjectNode.class);
+    } catch (IOException ex) {
+      LOGGER.warn("document isn't valid JSON.");
+      return null;
+    }
+
+    return new StreamsDatum(objectNode);
+  }
+
+  private synchronized void connectToCassandra() {
+    Cluster.Builder clusterBuilder = Cluster.builder()
+        .addContactPoints(config.getHost().toArray(new 
String[config.getHost().size()]))
+        .withPort(config.getPort().intValue());
+
+    keyspace = config.getKeyspace();
+    table = config.getTable();
+
+    if (StringUtils.isNotEmpty(config.getUser()) && 
StringUtils.isNotEmpty(config.getPassword())) {
+      cluster = clusterBuilder.withCredentials(config.getUser(), 
config.getPassword()).build();
+    } else {
+      cluster = clusterBuilder.build();
+    }
+
+    Metadata metadata = cluster.getMetadata();
+    if (Objects.isNull(metadata.getKeyspace(keyspace))) {
+      LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
+      Map<String, Object> replication = new HashMap<>();
+      replication.put("class", "SimpleStrategy");
+      replication.put("replication_factor", 1);
+
+      String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
+          .replication(replication).getQueryString();
+      cluster.connect().execute(createKeyspaceStmt);
+    }
+
+    session = cluster.connect(keyspace);
+
+    KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
+    TableMetadata tableMetadata = ks.getTable(table);
+
+    if (Objects.isNull(tableMetadata)) {
+      LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", 
table, keyspace);
+      String createTableStmt = SchemaBuilder.createTable(table)
+                                
.addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
+                                .addColumn(config.getColumn(), 
DataType.blob()).getQueryString();
+
+      session.execute(createTableStmt);
+    }
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+    ResultSet rs = session.execute(getSelectStatement());
+    Iterator<Row> rowsIterator = rs.iterator();
+
+    while (rowsIterator.hasNext()) {
+      Row row = rowsIterator.next();
+      StreamsDatum datum = prepareDatum(row);
+      write(datum);
+    }
+
+    return readCurrent();
+  }
+
+  @Override
+  public void startStream() {
+    LOGGER.debug("startStream");
+    CassandraPersistReaderTask readerTask = new 
CassandraPersistReaderTask(this);
+
+    CompletableFuture.runAsync(readerTask, executor);
+
+    try {
+      if (readerTaskFuture.get()) {
+        executor.shutdown();
+      }
+    } catch (InterruptedException ex) {
+      LOGGER.trace("Interrupt", ex);
+    } catch (ExecutionException ex) {
+      LOGGER.trace("Execution exception", ex);
+    }
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    StreamsResultSet current;
+
+    try {
+      lock.writeLock().lock();
+      current = new StreamsResultSet(persistQueue);
+      current.setCounter(new DatumStatusCounter());
+      persistQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    return current;
+  }
+
+  protected void write(StreamsDatum entry) {
+    boolean success;
+    do {
+      try {
+        lock.readLock().lock();
+        success = persistQueue.offer(entry);
+        Thread.yield();
+      } finally {
+        lock.readLock().unlock();
+      }
+    }
+    while (!success);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !executor.isTerminated() || !executor.isShutdown();
+  }
+
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.synchronizedQueue(new 
LinkedBlockingQueue<StreamsDatum>(10000));
+  }
+
+  private String getSelectStatement() {
+    return QueryBuilder.select().all()
+      .from(table).getQueryString();
+  }
+
+  public class CassandraPersistReaderTask implements Runnable {
+
+    private CassandraPersistReader reader;
+
+    public CassandraPersistReaderTask(CassandraPersistReader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (reader.rowIterator.hasNext()) {
+          Row row = reader.rowIterator.next();
+          StreamsDatum datum = reader.prepareDatum(row);
+          reader.write(datum);
+        }
+      } finally {
+        readerTaskFuture.complete(true);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
new file mode 100644
index 0000000..81c0e9e
--- /dev/null
+++ 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.GuidUtils;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, 
Flushable, Closeable {
+
+  public static final String STREAMS_ID = "CassandraPersistWriter";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraPersistWriter.class);
+
+  private static final long MAX_WRITE_LATENCY = 1000;
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private volatile AtomicLong lastWrite = new 
AtomicLong(System.currentTimeMillis());
+  private ScheduledExecutorService backgroundFlushTask = 
Executors.newSingleThreadScheduledExecutor();
+
+  private CassandraConfiguration config;
+
+  protected Cluster cluster;
+  protected Session session;
+
+  protected String keyspace;
+  protected String table;
+  protected PreparedStatement insertStatement;
+
+  protected List<BoundStatement> insertBatch = new ArrayList<>();
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public CassandraPersistWriter() {
+    this(new ComponentConfigurator<>(CassandraConfiguration.class)
+    
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra")));
+  }
+
+  public CassandraPersistWriter(CassandraConfiguration config) {
+    this.config = config;
+  }
+
+  public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+    this.persistQueue = persistQueue;
+  }
+
+  public Queue<StreamsDatum> getPersistQueue() {
+    return persistQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void write(StreamsDatum streamsDatum) {
+
+    ObjectNode node;
+
+    if (streamsDatum.getDocument() instanceof String) {
+      try {
+        node = mapper.readValue((String) streamsDatum.getDocument(), 
ObjectNode.class);
+
+        byte[] value = node.toString().getBytes();
+
+        String key = GuidUtils.generateGuid(node.toString());
+        if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) {
+          key = streamsDatum.getMetadata().get("id").toString();
+        }
+
+        BoundStatement statement = insertStatement.bind(key, 
ByteBuffer.wrap(value));
+        insertBatch.add(statement);
+      } catch (IOException ex) {
+        LOGGER.warn("Failure adding object: {}", 
streamsDatum.getDocument().toString());
+        return;
+      }
+    } else {
+      try {
+        node = mapper.valueToTree(streamsDatum.getDocument());
+
+        byte[] value = node.toString().getBytes();
+
+        String key = GuidUtils.generateGuid(node.toString());
+        if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) {
+          key = streamsDatum.getMetadata().get("id").toString();
+        }
+
+        BoundStatement statement = insertStatement.bind(key, 
ByteBuffer.wrap(value));
+        insertBatch.add(statement);
+      } catch (Exception ex) {
+        LOGGER.warn("Failure adding object: {}", 
streamsDatum.getDocument().toString());
+        return;
+      }
+    }
+
+    flushIfNecessary();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    try {
+      LOGGER.debug("Attempting to flush {} items to cassandra", 
insertBatch.size());
+      lock.writeLock().lock();
+
+      BatchStatement batchStatement = new BatchStatement();
+      batchStatement.addAll(insertBatch);
+      session.execute(batchStatement);
+
+      lastWrite.set(System.currentTimeMillis());
+      insertBatch = new ArrayList<>();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    session.close();
+    cluster.close();
+    backgroundFlushTask.shutdownNow();
+  }
+
+  /**
+   * start write thread.
+   */
+  public void start() {
+    connectToCassandra();
+    backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        flushIfNecessary();
+      }
+    }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * stop.
+   */
+  public void stop() {
+    try {
+      flush();
+    } catch (IOException ex) {
+      LOGGER.error("Error flushing", ex);
+    }
+
+    try {
+      close();
+    } catch (IOException ex) {
+      LOGGER.error("Error closing", ex);
+    }
+
+    try {
+      backgroundFlushTask.shutdown();
+      // Wait a while for existing tasks to terminate
+      if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+        backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+          LOGGER.error("Stream did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      backgroundFlushTask.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      if (persistQueue.peek() != null) {
+        try {
+          StreamsDatum entry = persistQueue.remove();
+          write(entry);
+        } catch (Exception ex) {
+          LOGGER.warn("Failure writing entry from Queue: {}", ex.getMessage());
+        }
+      }
+      try {
+        Thread.sleep(new Random().nextInt(1));
+      } catch (InterruptedException interrupt) {
+        LOGGER.trace("Interrupt", interrupt);
+      }
+    }
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.persistQueue = new ConcurrentLinkedQueue<>();
+    start();
+  }
+
+  @Override
+  public void cleanUp() {
+    stop();
+  }
+
+  protected void flushIfNecessary() {
+    long lastLatency = System.currentTimeMillis() - lastWrite.get();
+    //Flush iff the size > 0 AND the size is divisible by 100 or the time 
between now and the last flush is greater
+    //than the maximum desired latency
+    if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || 
lastLatency > MAX_WRITE_LATENCY)) {
+      try {
+        flush();
+      } catch (IOException ex) {
+        LOGGER.error("Error writing to Cassandra", ex);
+      }
+    }
+  }
+
+  private synchronized void connectToCassandra() {
+    Cluster.Builder clusterBuilder = Cluster.builder()
+        .addContactPoints(config.getHost().toArray(new 
String[config.getHost().size()]))
+        .withPort(config.getPort().intValue());
+
+    keyspace = config.getKeyspace();
+    table = config.getTable();
+
+    if (StringUtils.isNotEmpty(config.getUser()) && 
StringUtils.isNotEmpty(config.getPassword())) {
+      cluster = clusterBuilder.withCredentials(config.getUser(), 
config.getPassword()).build();
+    } else {
+      cluster = clusterBuilder.build();
+    }
+
+    Metadata metadata = cluster.getMetadata();
+    if (Objects.isNull(metadata.getKeyspace(keyspace))) {
+      LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
+      Map<String, Object> replication = new HashMap<>();
+      replication.put("class", "SimpleStrategy");
+      replication.put("replication_factor", 1);
+
+      String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
+          .replication(replication).getQueryString();
+      cluster.connect().execute(createKeyspaceStmt);
+    }
+
+    session = cluster.connect(keyspace);
+
+    KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
+    TableMetadata tableMetadata = ks.getTable(table);
+
+    if (Objects.isNull(tableMetadata)) {
+      LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", 
table, keyspace);
+      String createTableStmt = SchemaBuilder.createTable(table)
+                                
.addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
+                                .addColumn(config.getColumn(), 
DataType.blob()).getQueryString();
+
+      session.execute(createTableStmt);
+    }
+
+    createInsertStatement();
+  }
+
+  private void createInsertStatement() {
+    Insert insertBuilder = QueryBuilder.insertInto(table);
+    insertBuilder.value(config.getPartitionKeyColumn(), new Object());
+    insertBuilder.value(config.getColumn(), new Object());
+    insertStatement = session.prepare(insertBuilder.getQueryString());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
 
b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
new file mode 100644
index 0000000..4b4cf1f
--- /dev/null
+++ 
b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
@@ -0,0 +1,47 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema";,
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0";
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType": "org.apache.streams.cassandra.CassandraConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "host": {
+      "type": "array",
+      "items": {
+        "type": "string"
+      },
+      "description": "Cassandra host"
+    },
+    "port": {
+      "type": "integer",
+      "description": "Cassandra port"
+    },
+    "user": {
+      "type": "string",
+      "description": "User"
+    },
+    "password": {
+      "type": "string",
+      "description": "Password"
+    },
+    "keyspace": {
+      "type": "string",
+      "description": "Keyspace"
+    },
+    "table": {
+      "type": "string",
+      "description": "Table"
+    },
+    "partitionKeyColumn": {
+      "type": "string",
+      "description": "Partition Key column name"
+    },
+    "column": {
+      "type": "string",
+      "description": "Column name"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf 
b/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf
new file mode 100644
index 0000000..8576e9a
--- /dev/null
+++ 
b/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cassandra {
+  "host": ["127.0.0.1"],
+  "port": 9042,
+  "keyspace": "test_keyspace",
+  "table": "test_table",
+  "partitionKeyColumn": "key",
+  "column": "value"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot 
b/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot
new file mode 100644
index 0000000..916d8c0
--- /dev/null
+++ 
b/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+digraph g {
+
+  graph [compound = true];
+
+  //presentation
+  splines = true;
+  overlap = false;
+  rankdir = TB;
+
+  generators [label="generators", shape="circle"];
+  providers [label="providers", shape="circle"];
+  processors [label="processors", shape="circle"];
+
+  subgraph cluster_persisters {
+    label="persisters";
+    persisters_cassandra_reader [label="CassandraPersistReader"]
+    persisters_cassandra_writer [label="CassandraPersistWriter"]
+  }
+
+  subgraph cluster_dbs {
+    label="dbs";
+    cassandra [label="cassandra", shape="cylinder"]
+  }
+
+  generators -> providers
+  providers -> processors
+  processors -> persisters_cassandra_writer [label="StreamsDatum"]
+  persisters_cassandra_reader -> processors [label="StreamsDatum[String]"]
+  cassandra -> persisters_cassandra_reader
+  persisters_cassandra_writer -> cassandra
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md 
b/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md
new file mode 100644
index 0000000..9d01112
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md
@@ -0,0 +1,36 @@
+## Cassandra
+
+Start cassandra via docker with the docker maven plugin:
+
+    docker -PdockerITs docker:start
+
+Confirm that cassandra is running:
+
+    docker ps
+
+Confirm that host and post(s) are in property file:
+
+    cat cassandra.properties
+
+Create a local file `cassandra.conf` with cluster details:
+
+    cassandra {
+      host = ${cassandra.tcp.host}
+      port = ${cassandra.tcp.port}
+    }
+
+When configuring a stream, include these files:
+
+    include "cassandra.properties"
+    include "cassandra.conf"
+
+Supply application-specific configuration as well:
+
+    cassandra {
+      keyspace = test_keyspace1
+      table = test_table1
+      partitionKeyColumn = key
+      column = value
+    }
+
+###### Licensed under Apache License 2.0 - 
http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md 
b/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md
new file mode 100644
index 0000000..353f1c4
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md
@@ -0,0 +1,23 @@
+streams-persist-cassandra
+=====================
+
+Read/write to/from Cassandra
+
+## Configuration
+
+| Schema |
+|--------|
+| 
[CassandraConfiguration.json](../../../org/apache/streams/cassandra/CassandraConfiguration.json
 "CassandraConfiguration.json") 
[CassandraConfiguration.html](apidocs/org/apache/streams/cassandra/CassandraConfiguration.html
 "javadoc") |
+
+## Components
+
+![components](components.dot.svg "Components")
+
+| Class | 
+|-------|
+| CassandraPersistReader 
[CassandraPersistReader.html](apidocs/org/apache/streams/cassandra/CassandraPersistReader.html
 "javadoc") 
+| CassandraPersistWriter 
[CassandraPersistWriter.html](apidocs/org/apache/streams/cassandra/CassandraPersistWriter.html
 "javadoc") 
+    
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - 
http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/site.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/site/site.xml 
b/streams-contrib/streams-persist-cassandra/src/site/site.xml
new file mode 100644
index 0000000..f82300a
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/site/site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project>
+    <body>
+        <links name="Help">
+            <item name="Cassandra" href="cassandra.html"/>
+        </links>
+    </body>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
 
b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
new file mode 100644
index 0000000..ca675b9
--- /dev/null
+++ 
b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.test;
+
+import org.apache.streams.cassandra.CassandraConfiguration;
+import org.apache.streams.cassandra.CassandraPersistReader;
+import org.apache.streams.cassandra.CassandraPersistWriter;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * Test writing documents
+ */
+public class CassandraPersistIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraPersistIT.class);
+
+  private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private CassandraConfiguration testConfiguration;
+
+  private int count = 0;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    Config reference = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/CassandraPersistIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new 
ComponentConfigurator<>(CassandraConfiguration.class).detectConfiguration(typesafe,
 "cassandra");
+  }
+
+  @Test
+  public void testCassandraPersist() throws Exception {
+    CassandraPersistWriter writer = new 
CassandraPersistWriter(testConfiguration);
+
+    writer.prepare(null);
+
+    InputStream testActivityFolderStream = 
CassandraPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, 
StandardCharsets.UTF_8);
+
+    for (String file: files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = 
CassandraPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+      activity.getAdditionalProperties().remove("$license");
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      writer.write(datum);
+
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
+
+    LOGGER.info("Total Written: {}", count );
+    Assert.assertEquals(89, count);
+
+    writer.cleanUp();
+
+    CassandraPersistReader reader = new 
CassandraPersistReader(testConfiguration);
+
+    reader.prepare(null);
+
+    StreamsResultSet resultSet = reader.readAll();
+
+    LOGGER.info("Total Read: {}", resultSet.size() );
+    Assert.assertEquals(89, resultSet.size());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
 
b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
new file mode 100644
index 0000000..62fec7d
--- /dev/null
+++ 
b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cassandra {
+  host = [${cassandra.tcp.host}]
+  port = ${cassandra.tcp.port}
+  user = cassandra
+  password = cassandra
+  keyspace = test_keyspace
+  table = test_table
+  partitionKeyColumn = key
+  column = value
+}
\ No newline at end of file

Reply via email to