[ https://issues.apache.org/jira/browse/STREAMS-478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15784005#comment-15784005 ]
ASF GitHub Bot commented on STREAMS-478: ---------------------------------------- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-streams/pull/345#discussion_r94089087 --- Diff: streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.cassandra; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.GuidUtils; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable { + + public static final String STREAMS_ID = "CassandraPersistWriter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistWriter.class); + + private static final long MAX_WRITE_LATENCY = 1000; + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); + private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + + private CassandraConfiguration config; + + protected Cluster cluster; + protected Session session; + + protected String keyspace; + protected String table; + protected PreparedStatement insertStatement; + + protected List<BoundStatement> insertBatch = new ArrayList<>(); + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public CassandraPersistWriter() { + this(new ComponentConfigurator<>(CassandraConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra"))); + } + + public CassandraPersistWriter(CassandraConfiguration config) { + this.config = config; + } + + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } + + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + ObjectNode node; + + if (streamsDatum.getDocument() instanceof String) { + try { + node = mapper.readValue((String) streamsDatum.getDocument(), ObjectNode.class); + + byte[] value = node.toString().getBytes(); + + String key = GuidUtils.generateGuid(node.toString()); + if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) { + key = streamsDatum.getMetadata().get("id").toString(); + } + + BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value)); + insertBatch.add(statement); + } catch (IOException ex) { + ex.printStackTrace(); + LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); + return; + } + } else { + try { + node = mapper.valueToTree(streamsDatum.getDocument()); + + byte[] value = node.toString().getBytes(); + + String key = GuidUtils.generateGuid(node.toString()); + if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) { + key = streamsDatum.getMetadata().get("id").toString(); + } + + BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value)); + insertBatch.add(statement); + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); + return; + } + } + + flushIfNecessary(); + } + + @Override + public void flush() throws IOException { + try { + LOGGER.debug("Attempting to flush {} items to cassandra", insertBatch.size()); + lock.writeLock().lock(); + + BatchStatement batchStatement = new BatchStatement(); + batchStatement.addAll(insertBatch); + session.execute(batchStatement); + + lastWrite.set(System.currentTimeMillis()); + insertBatch = new ArrayList<>(); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public synchronized void close() throws IOException { + session.close(); + cluster.close(); + backgroundFlushTask.shutdownNow(); + } + + /** + * start write thread. + */ + public void start() { + connectToCassandra(); + backgroundFlushTask.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + flushIfNecessary(); + } + }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS); + } + + /** + * stop. + */ + public void stop() { + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error flushing", ex); + } + + try { + close(); + } catch (IOException ex) { + LOGGER.error("Error closing", ex); + } + + try { + backgroundFlushTask.shutdown(); + // Wait a while for existing tasks to terminate + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + LOGGER.error("Stream did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + backgroundFlushTask.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + while (true) { + if (persistQueue.peek() != null) { + try { + StreamsDatum entry = persistQueue.remove(); + write(entry); + } catch (Exception ex) { + ex.printStackTrace(); --- End diff -- Log the Exception instead using Logger ? > streams persist reader/writer for Apache Cassandra > -------------------------------------------------- > > Key: STREAMS-478 > URL: https://issues.apache.org/jira/browse/STREAMS-478 > Project: Streams > Issue Type: New Feature > Reporter: Subhobrata Dey > > This issue is created to propose the streams persist reader/writer to > get/post documents from/to apache cassandra. -- This message was sent by Atlassian JIRA (v6.3.4#6332)