[ https://issues.apache.org/jira/browse/STREAMS-478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15783116#comment-15783116 ]
ASF GitHub Bot commented on STREAMS-478: ---------------------------------------- Github user smarthi commented on a diff in the pull request: https://github.com/apache/incubator-streams/pull/345#discussion_r94044606 --- Diff: streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java --- @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.cassandra; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.GuidUtils; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable { + + public static final String STREAMS_ID = "CassandraPersistWriter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistWriter.class); + + private static final long MAX_WRITE_LATENCY = 1000; + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); + private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + + private CassandraConfiguration config; + + protected Cluster cluster; + protected Session session; + + protected String keyspace; + protected String table; + protected PreparedStatement insertStatement; + + protected List<BoundStatement> insertBatch = new ArrayList<>(); + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public CassandraPersistWriter() { + this(new ComponentConfigurator<>(CassandraConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra"))); + } + + public CassandraPersistWriter(CassandraConfiguration config) { + this.config = config; + } + + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } + + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + ObjectNode node; + + if (streamsDatum.getDocument() instanceof String) { + try { + node = mapper.readValue((String) streamsDatum.getDocument(), ObjectNode.class); + + byte[] value = node.toString().getBytes(); + + BoundStatement statement = insertStatement.bind(GuidUtils.generateGuid(node.toString()), ByteBuffer.wrap(value)); + insertBatch.add(statement); + } catch (IOException ex) { + ex.printStackTrace(); + LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); + return; + } + } else { + try { + node = mapper.valueToTree(streamsDatum.getDocument()); + + byte[] value = node.toString().getBytes(); + + BoundStatement statement = insertStatement.bind(GuidUtils.generateGuid(node.toString()), ByteBuffer.wrap(value)); + insertBatch.add(statement); + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString()); + return; + } + } + + flushIfNecessary(); + } + + @Override + public void flush() throws IOException { + try { + LOGGER.debug("Attempting to flush {} items to cassandra", insertBatch.size()); + lock.writeLock().lock(); + + BatchStatement batchStatement = new BatchStatement(); + batchStatement.addAll(insertBatch); + session.execute(batchStatement); + + lastWrite.set(System.currentTimeMillis()); + insertBatch = new ArrayList<>(); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public synchronized void close() throws IOException { + session.close(); + cluster.close(); + backgroundFlushTask.shutdownNow(); + } + + /** + * start write thread. + */ + public void start() { + connectToCassandra(); + backgroundFlushTask.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + flushIfNecessary(); + } + }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS); + } + + /** + * stop. + */ + public void stop() { + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error flushing", ex); + } + + try { + close(); + } catch (IOException ex) { + LOGGER.error("Error closing", ex); + } + + try { + backgroundFlushTask.shutdown(); + // Wait a while for existing tasks to terminate + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + LOGGER.error("Stream did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + backgroundFlushTask.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + while (true) { + if (persistQueue.peek() != null) { + try { + StreamsDatum entry = persistQueue.remove(); + write(entry); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + try { + Thread.sleep(new Random().nextInt(1)); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupt", interrupt); + } + } + } + + @Override + public void prepare(Object configurationObject) { + this.persistQueue = new ConcurrentLinkedQueue<>(); + start(); + } + + @Override + public void cleanUp() { + stop(); + } + + protected void flushIfNecessary() { + long lastLatency = System.currentTimeMillis() - lastWrite.get(); + //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater + //than the maximum desired latency + if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) { + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error writing to Cassandra", ex); + } + } + } + + private synchronized void connectToCassandra() { + Cluster.Builder clusterBuilder = Cluster.builder().addContactPoint(config.getHost()) + .withPort(config.getPort().intValue()); + + keyspace = config.getKeyspace(); + table = config.getTable(); + + if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { --- End diff -- use StringUtils.isNotEmpty() instead > streams persist reader/writer for Apache Cassandra > -------------------------------------------------- > > Key: STREAMS-478 > URL: https://issues.apache.org/jira/browse/STREAMS-478 > Project: Streams > Issue Type: New Feature > Reporter: Subhobrata Dey > > This issue is created to propose the streams persist reader/writer to > get/post documents from/to apache cassandra. -- This message was sent by Atlassian JIRA (v6.3.4#6332)