[ https://issues.apache.org/jira/browse/STREAMS-478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15783113#comment-15783113 ]
ASF GitHub Bot commented on STREAMS-478: ---------------------------------------- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-streams/pull/345#discussion_r94044472 --- Diff: streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java --- @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.cassandra; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import com.google.common.collect.Queues; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * CassandraPersistReader reads documents from cassandra. + */ +public class CassandraPersistReader implements StreamsPersistReader { + + public static final String STREAMS_ID = "CassandraPersistReader"; + + public static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistReader.class); + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + private ExecutorService executor; + private CompletableFuture<Boolean> readerTaskFuture = new CompletableFuture<>(); + + private CassandraConfiguration config; + + protected Cluster cluster; + protected Session session; + + protected String keyspace; + protected String table; + protected Iterator<Row> rowIterator; + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * CassandraPersistReader constructor - resolves CassandraConfiguration from JVM 'cassandra'. + */ + public CassandraPersistReader() { + this.config = new ComponentConfigurator<>(CassandraConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra")); + } + + /** + * CassandraPersistReader constructor - uses supplied CassandraConfiguration. + * @param config config + */ + public CassandraPersistReader(CassandraConfiguration config) { + this.config = config; + } + + /** + * CassandraPersistReader constructor - uses supplied persistQueue. + * @param persistQueue persistQueue + */ + public CassandraPersistReader(Queue<StreamsDatum> persistQueue) { + this.config = new ComponentConfigurator<>(CassandraConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra")); + this.persistQueue = persistQueue; + } + + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } + + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } + + public void stop() { + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + connectToCassandra(); + + String selectStatement = getSelectStatement(); + ResultSet rs = session.execute(selectStatement); + rowIterator = rs.iterator(); + + if (!rowIterator.hasNext()) { + throw new RuntimeException("Table" + table + "is empty!"); + } + + persistQueue = constructQueue(); + + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public void cleanUp() { + stop(); + } + + protected StreamsDatum prepareDatum(Row row) { + ObjectNode objectNode; + + try { + byte[] value = row.getBytes(config.getColumn()).array(); + objectNode = mapper.readValue(value, ObjectNode.class); + } catch (IOException ex) { + LOGGER.warn("document isn't valid JSON."); + return null; + } + + return new StreamsDatum(objectNode); + } + + private synchronized void connectToCassandra() { + Cluster.Builder clusterBuilder = Cluster.builder().addContactPoint(config.getHost()) + .withPort(config.getPort().intValue()); + + keyspace = config.getKeyspace(); + table = config.getTable(); + + if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { + cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build(); + } else { + cluster = clusterBuilder.build(); + } + + Metadata metadata = cluster.getMetadata(); + if (Objects.isNull(metadata.getKeyspace(keyspace))) { + LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace); + Map<String, Object> replication = new HashMap<String, Object>(); --- End diff -- please keep to Java 8 conventions - no need to specify the types on RHS > streams persist reader/writer for Apache Cassandra > -------------------------------------------------- > > Key: STREAMS-478 > URL: https://issues.apache.org/jira/browse/STREAMS-478 > Project: Streams > Issue Type: New Feature > Reporter: Subhobrata Dey > > This issue is created to propose the streams persist reader/writer to > get/post documents from/to apache cassandra. -- This message was sent by Atlassian JIRA (v6.3.4#6332)