http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java index 492eccb..4554c0f 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java @@ -18,13 +18,6 @@ package org.apache.streams.hdfs; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.converter.LineReadWriteUtil; @@ -34,6 +27,15 @@ import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,249 +52,278 @@ import java.util.List; import java.util.Queue; import java.util.zip.GZIPOutputStream; +/** + * WebHdfsPersistWriter writes to hdfs. + */ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable { - public final static String STREAMS_ID = "WebHdfsPersistWriter"; - - private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class); - - private FileSystem client; - private Path path; - private int linesPerFile; - private int totalRecordsWritten = 0; - private final List<Path> writtenFiles = new ArrayList<Path>(); - private int fileLineCounter = 0; - private OutputStreamWriter currentWriter = null; - - private static final int BYTES_IN_MB = 1024 * 1024; - private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB; - private volatile int totalByteCount = 0; - private volatile int byteCount = 0; - - public boolean terminate = false; - - protected volatile Queue<StreamsDatum> persistQueue; - - private ObjectMapper mapper; - private LineReadWriteUtil lineWriterUtil; - - protected HdfsWriterConfiguration hdfsConfiguration; - - public WebHdfsPersistWriter() { - this(new ComponentConfigurator<>(HdfsWriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs"))); - } - - public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) { - this.hdfsConfiguration = hdfsConfiguration; - this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue(); + public static final String STREAMS_ID = "WebHdfsPersistWriter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class); + + private FileSystem client; + private Path path; + private int linesPerFile; + private int totalRecordsWritten = 0; + private final List<Path> writtenFiles = new ArrayList<Path>(); + private int fileLineCounter = 0; + private OutputStreamWriter currentWriter = null; + + private static final int BYTES_IN_MB = 1024 * 1024; + private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB; + private volatile int totalByteCount = 0; + private volatile int byteCount = 0; + + public boolean terminate = false; + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper; + private LineReadWriteUtil lineWriterUtil; + + protected HdfsWriterConfiguration hdfsConfiguration; + + public WebHdfsPersistWriter() { + this(new ComponentConfigurator<>(HdfsWriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs"))); + } + + public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) { + this.hdfsConfiguration = hdfsConfiguration; + this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue(); + } + + /** + * getURI from hdfsConfiguration. + * @return URI + * @throws URISyntaxException URISyntaxException + */ + // TODO: combine with WebHdfsPersistReader.getURI + public URI getURI() throws URISyntaxException { + StringBuilder uriBuilder = new StringBuilder(); + uriBuilder.append(hdfsConfiguration.getScheme()); + uriBuilder.append("://"); + if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) { + uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); + } else { + uriBuilder.append("/"); } - - public URI getURI() throws URISyntaxException { - StringBuilder uriBuilder = new StringBuilder(); - uriBuilder.append(hdfsConfiguration.getScheme()); - uriBuilder.append("://"); - if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) - uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); - else - uriBuilder.append("/"); - return new URI(uriBuilder.toString()); + return new URI(uriBuilder.toString()); + } + + /** + * isConnected. + * @return true if connected, false otherwise + */ + // TODO: combine with WebHdfsPersistReader.isConnected + public boolean isConnected() { + return (client != null); + } + + /** + * getFileSystem. + * @return FileSystem + */ + // TODO: combine with WebHdfsPersistReader.getFileSystem + public final synchronized FileSystem getFileSystem() { + // Check to see if we are connected. + if (!isConnected()) { + connectToWebHDFS(); } - - public boolean isConnected() { - return (client != null); - } - - public final synchronized FileSystem getFileSystem() { - // Check to see if we are connected. - if (!isConnected()) - connectToWebHDFS(); - return this.client; - } - - private synchronized void connectToWebHDFS() { - try { - LOGGER.info("User : {}", this.hdfsConfiguration.getUser()); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser()); - ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE); - - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws Exception { - Configuration conf = new Configuration(); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - LOGGER.info("WebURI : {}", getURI().toString()); - client = FileSystem.get(getURI(), conf); - LOGGER.info("Connected to WebHDFS"); - - /* - * ************************************************************************************************ - * This code is an example of how you would work with HDFS and you weren't going over - * the webHDFS protocol. - * - * Smashew: 2013-10-01 - * ************************************************************************************************ - conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName); - conf.set("namenode.host","0.0.0.0"); - conf.set("hadoop.job.ugi", userName); - conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner"); - fileSystem.createNewFile(new Path("/user/"+ userName + "/test")); - FileStatus[] status = fs.listStatus(new Path("/user/" + userName)); - for(int i=0;i<status.length;i++) - { - LOGGER.info("Directory: {}", status[i].getPath()); - } - */ - return null; - } - }); - } catch (Exception e) { - LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", e); - throw new RuntimeException(e); + return this.client; + } + + private synchronized void connectToWebHDFS() { + try { + LOGGER.info("User : {}", this.hdfsConfiguration.getUser()); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser()); + ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + LOGGER.info("WebURI : {}", getURI().toString()); + client = FileSystem.get(getURI(), conf); + LOGGER.info("Connected to WebHDFS"); + + /* + * ************************************************************************************************ + * This code is an example of how you would work with HDFS and you weren't going over + * the webHDFS protocol. + * + * Smashew: 2013-10-01 + * ************************************************************************************************ + conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName); + conf.set("namenode.host","0.0.0.0"); + conf.set("hadoop.job.ugi", userName); + conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner"); + fileSystem.createNewFile(new Path("/user/"+ userName + "/test")); + FileStatus[] status = fs.listStatus(new Path("/user/" + userName)); + for(int i=0;i<status.length;i++) + { + LOGGER.info("Directory: {}", status[i].getPath()); + } + */ + + return null; } + }); + } catch (Exception ex) { + LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", ex); + throw new RuntimeException(ex); } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public void write(StreamsDatum streamsDatum) { - - synchronized (this) { - // Check to see if we need to reset the file that we are currently working with - if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile)) - resetFile(); - - String line = lineWriterUtil.convertResultToString(streamsDatum); - writeInternal(line); - if( !line.endsWith(this.hdfsConfiguration.getLineDelimiter())) - writeInternal(this.hdfsConfiguration.getLineDelimiter()); - int bytesInLine = line.getBytes().length; - - totalRecordsWritten++; - totalByteCount += bytesInLine; - byteCount += bytesInLine; - - if (byteCount > BYTES_BEFORE_FLUSH) - try { - flush(); - } catch (IOException e) { - LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution. WARNING: There could be data loss.", e); - } - - this.fileLineCounter++; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + synchronized (this) { + // Check to see if we need to reset the file that we are currently working with + if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile)) { + resetFile(); + } + String line = lineWriterUtil.convertResultToString(streamsDatum); + writeInternal(line); + if ( !line.endsWith(this.hdfsConfiguration.getLineDelimiter())) { + writeInternal(this.hdfsConfiguration.getLineDelimiter()); + } + int bytesInLine = line.getBytes().length; + + totalRecordsWritten++; + totalByteCount += bytesInLine; + byteCount += bytesInLine; + + if (byteCount > BYTES_BEFORE_FLUSH) { + try { + flush(); + } catch (IOException ex) { + LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution. WARNING: There could be data loss.", ex); } + } + this.fileLineCounter++; } - - private void writeInternal(String line) { + } + + private void writeInternal(String line) { + try { + this.currentWriter.write(line); + } catch (IOException ex) { + LOGGER.warn("Error writing to HDFS. Attempting to try a new file", ex); + try { + resetFile(); + this.currentWriter.write(line); + } catch (Exception e2) { + LOGGER.warn("Failed to write even after creating a new file. Attempting to reconnect", e2); try { - this.currentWriter.write(line); - } catch (IOException e) { - LOGGER.warn("Error writing to HDFS. Attempting to try a new file", e); - try{ - resetFile(); - this.currentWriter.write(line); - } catch (Exception io) { - LOGGER.warn("Failed to write even after creating a new file. Attempting to reconnect", io); - try { - connectToWebHDFS(); - resetFile(); - this.currentWriter.write(line); - } catch (Exception ex) { - LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", ex); - throw new RuntimeException(e); - } - } - + connectToWebHDFS(); + resetFile(); + this.currentWriter.write(line); + } catch (Exception e3) { + LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", e3); + throw new RuntimeException(e3); } - } + } - public void flush() throws IOException { - if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) { - this.currentWriter.flush(); - byteCount = 0; - } } + } - private synchronized void resetFile() { - // this will keep it thread safe, so we don't create too many files - if (this.fileLineCounter == 0 && this.currentWriter != null) - return; + @Override + public void flush() throws IOException { + if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) { + this.currentWriter.flush(); + byteCount = 0; + } + } - // Create the path for where the file is going to live. - Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime()); + private synchronized void resetFile() { + // this will keep it thread safe, so we don't create too many files + if (this.fileLineCounter == 0 && this.currentWriter != null) { + return; + } - if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) - filePath = filePath.suffix(".gz"); - else - filePath = filePath.suffix(".tsv"); + // Create the path for where the file is going to live. + Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime()); - try { + if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) { + filePath = filePath.suffix(".gz"); + } else { + filePath = filePath.suffix(".tsv"); + } - // if there is a current writer, we must close it first. - if (this.currentWriter != null) { - flush(); - close(); - } + try { - this.fileLineCounter = 0; + // if there is a current writer, we must close it first. + if (this.currentWriter != null) { + flush(); + close(); + } - // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed. - if (client.exists(filePath)) - throw new RuntimeException("Unable to create file: " + filePath); + this.fileLineCounter = 0; - if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) - this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath))); - else - this.currentWriter = new OutputStreamWriter(client.create(filePath)); + // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed. + if (client.exists(filePath)) { + throw new RuntimeException("Unable to create file: " + filePath); + } - // Add another file to the list of written files. - writtenFiles.add(filePath); + if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) { + this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath))); + } else { + this.currentWriter = new OutputStreamWriter(client.create(filePath)); + } - LOGGER.info("File Created: {}", filePath); - } catch (Exception e) { - LOGGER.error("COULD NOT CreateFile: {}", filePath); - LOGGER.error(e.getMessage()); - throw new RuntimeException(e); - } - } + // Add another file to the list of written files. + writtenFiles.add(filePath); - public synchronized void close() throws IOException { - if (this.currentWriter != null) { - this.currentWriter.flush(); - this.currentWriter.close(); - this.currentWriter = null; - LOGGER.info("File Closed"); - } + LOGGER.info("File Created: {}", filePath); + } catch (Exception ex) { + LOGGER.error("COULD NOT CreateFile: {}", filePath); + LOGGER.error(ex.getMessage()); + throw new RuntimeException(ex); } - - @Override - public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration); - connectToWebHDFS(); - path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath()); + } + + @Override + public synchronized void close() throws IOException { + if (this.currentWriter != null) { + this.currentWriter.flush(); + this.currentWriter.close(); + this.currentWriter = null; + LOGGER.info("File Closed"); } - - @Override - public void cleanUp() { - try { - flush(); - } catch (IOException e) { - LOGGER.error("Error flushing on cleanup", e); - } - try { - close(); - } catch (IOException e) { - LOGGER.error("Error closing on cleanup", e); - } + } + + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); + lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration); + connectToWebHDFS(); + path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath()); + } + + @Override + public void cleanUp() { + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error flushing on cleanup", ex); } - - @Override - public DatumStatusCounter getDatumStatusCounter() { - DatumStatusCounter counters = new DatumStatusCounter(); - counters.incrementAttempt(this.totalRecordsWritten); - counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten); - return counters; + try { + close(); + } catch (IOException ex) { + LOGGER.error("Error closing on cleanup", ex); } + } + + @Override + public DatumStatusCounter getDatumStatusCounter() { + DatumStatusCounter counters = new DatumStatusCounter(); + counters.incrementAttempt(this.totalRecordsWritten); + counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten); + return counters; + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java index 00cf17f..eb808c1 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java @@ -19,38 +19,43 @@ package org.apache.streams.hdfs; import org.apache.streams.core.StreamsDatum; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; +/** + * WebHdfsPersistReaderTask writes to hdfs on behalf of + * @see org.apache.streams.hdfs.WebHdfsPersistWriter + */ public class WebHdfsPersistWriterTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriterTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriterTask.class); - private WebHdfsPersistWriter writer; + private WebHdfsPersistWriter writer; - public WebHdfsPersistWriterTask(WebHdfsPersistWriter writer) { - this.writer = writer; - } + public WebHdfsPersistWriterTask(WebHdfsPersistWriter writer) { + this.writer = writer; + } - @Override - public void run() { - - while(true) { - if( writer.persistQueue.peek() != null ) { - try { - StreamsDatum entry = writer.persistQueue.remove(); - writer.write(entry); - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(new Random().nextInt(1)); - } catch (InterruptedException e) {} - } + @Override + public void run() { + while (true) { + if ( writer.persistQueue.peek() != null ) { + try { + StreamsDatum entry = writer.persistQueue.remove(); + writer.write(entry); + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(new Random().nextInt(1)); + } catch (InterruptedException e) {} } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java index 819414a..a35f124 100644 --- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java +++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java @@ -33,7 +33,7 @@ import static org.junit.Assert.*; */ public class HdfsPersistConfigTest { - private final static Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class); @Test public void getWriterFileUriTest() http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java index ff33ec3..7191d9a 100644 --- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java +++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java @@ -46,7 +46,7 @@ import java.util.List; */ public class TestHdfsPersist { - private final static Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class); ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java index d54e794..64f7200 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java @@ -18,19 +18,14 @@ package org.apache.streams.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.consumer.Whitelist; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.serializer.StringDecoder; -import kafka.utils.VerifiableProperties; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; import org.apache.streams.core.StreamsResultSet; + +import com.fasterxml.jackson.databind.ObjectMapper; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,113 +40,132 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -public class KafkaPersistReader implements StreamsPersistReader, Serializable { - - public final static String STREAMS_ID = "KafkaPersistReader"; - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class); - - protected volatile Queue<StreamsDatum> persistQueue; - - private ObjectMapper mapper = new ObjectMapper(); - - private KafkaConfiguration config; - - private ConsumerConnector consumerConnector; - - public List<KafkaStream<String, String>> inStreams; - - private ExecutorService executor = Executors.newSingleThreadExecutor(); +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.consumer.Whitelist; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.serializer.StringDecoder; +import kafka.utils.VerifiableProperties; - public KafkaPersistReader() { - this.config = new ComponentConfigurator<>(KafkaConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); - this.persistQueue = new ConcurrentLinkedQueue<>(); - } +/** + * KafkaPersistReader reads documents from kafka. + */ +public class KafkaPersistReader implements StreamsPersistReader, Serializable { - public KafkaPersistReader(Queue<StreamsDatum> persistQueue) { - this.config = new ComponentConfigurator<>(KafkaConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); - this.persistQueue = persistQueue; - } + public static final String STREAMS_ID = "KafkaPersistReader"; - public void setConfig(KafkaConfiguration config) { - this.config = config; - } + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class); - @Override - public String getId() { - return STREAMS_ID; - } + protected volatile Queue<StreamsDatum> persistQueue; - @Override - public void startStream() { + private ObjectMapper mapper = new ObjectMapper(); - Properties props = new Properties(); - props.setProperty("serializer.encoding", "UTF8"); + private KafkaConfiguration config; - ConsumerConfig consumerConfig = new ConsumerConfig(props); + private ConsumerConnector consumerConnector; - consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); + public List<KafkaStream<String, String>> inStreams; - Whitelist topics = new Whitelist(config.getTopic()); - VerifiableProperties vprops = new VerifiableProperties(props); + private ExecutorService executor = Executors.newSingleThreadExecutor(); - inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops)); + /** + * KafkaPersistReader constructor - resolves KafkaConfiguration from JVM 'kafka'. + */ + public KafkaPersistReader() { + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); + this.persistQueue = new ConcurrentLinkedQueue<>(); + } - for (final KafkaStream stream : inStreams) { - executor.submit(new KafkaPersistReaderTask(this, stream)); - } + /** + * KafkaPersistReader constructor - uses supplied persistQueue. + */ + public KafkaPersistReader(Queue<StreamsDatum> persistQueue) { + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); + this.persistQueue = persistQueue; + } - } + public void setConfig(KafkaConfiguration config) { + this.config = config; + } - @Override - public StreamsResultSet readAll() { - return readCurrent(); - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public StreamsResultSet readCurrent() { - return null; - } + @Override + public void startStream() { - @Override - public StreamsResultSet readNew(BigInteger bigInteger) { - return null; - } + Properties props = new Properties(); + props.setProperty("serializer.encoding", "UTF8"); - @Override - public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { - return null; - } + ConsumerConfig consumerConfig = new ConsumerConfig(props); - @Override - public boolean isRunning() { - return !executor.isShutdown() && !executor.isTerminated(); - } + consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); - private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { - Properties props = new Properties(); - props.put("zookeeper.connect", a_zookeeper); - props.put("group.id", a_groupId); - props.put("zookeeper.session.timeout.ms", "400"); - props.put("zookeeper.sync.time.ms", "200"); - props.put("auto.commit.interval.ms", "1000"); - return new ConsumerConfig(props); - } + Whitelist topics = new Whitelist(config.getTopic()); + VerifiableProperties vprops = new VerifiableProperties(props); - @Override - public void prepare(Object configurationObject) { + inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops)); + for (final KafkaStream stream : inStreams) { + executor.submit(new KafkaPersistReaderTask(this, stream)); } - @Override - public void cleanUp() { - consumerConnector.shutdown(); - while( !executor.isTerminated()) { - try { - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException ignored) {} - } + } + + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } + + @Override + public StreamsResultSet readCurrent() { + return null; + } + + @Override + public StreamsResultSet readNew(BigInteger bigInteger) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { + return null; + } + + @Override + public boolean isRunning() { + return !executor.isShutdown() && !executor.isTerminated(); + } + + private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { + Properties props = new Properties(); + props.put("zookeeper.connect", zookeeper); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + return new ConsumerConfig(props); + } + + @Override + public void prepare(Object configurationObject) { + + } + + @Override + public void cleanUp() { + consumerConnector.shutdown(); + while ( !executor.isTerminated()) { + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupt", interrupt); + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java index 83493e0..199be73 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java @@ -18,45 +18,51 @@ package org.apache.streams.kafka; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.message.MessageAndMetadata; import org.apache.streams.core.StreamsDatum; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; -public class KafkaPersistReaderTask implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class); - - private KafkaPersistReader reader; - private KafkaStream<String,String> stream; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.message.MessageAndMetadata; - public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) { - this.reader = reader; - this.stream = stream; - } +/** + * KafkaPersistReaderTask reads documents from kafka on behalf of + * @see org.apache.streams.kafka.KafkaPersistReader + */ +public class KafkaPersistReaderTask implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class); + private KafkaPersistReader reader; + private KafkaStream<String,String> stream; - @Override - public void run() { + public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) { + this.reader = reader; + this.stream = stream; + } - MessageAndMetadata<String,String> item; - while(true) { + @Override + public void run() { - ConsumerIterator<String, String> it = stream.iterator(); - while (it.hasNext()) { - item = it.next(); - reader.persistQueue.add(new StreamsDatum(item.message())); - } - try { - Thread.sleep(new Random().nextInt(100)); - } catch (InterruptedException e) {} - } + MessageAndMetadata<String,String> item; + while (true) { + ConsumerIterator<String, String> it = stream.iterator(); + while (it.hasNext()) { + item = it.next(); + reader.persistQueue.add(new StreamsDatum(item.message())); + } + try { + Thread.sleep(new Random().nextInt(100)); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupt", interrupt); + } } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java index 83032e6..40e125f 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java @@ -18,16 +18,15 @@ package org.apache.streams.kafka; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.util.GuidUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,99 +35,114 @@ import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable { - - public final static String STREAMS_ID = "KafkaPersistWriter"; - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class); +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; - protected volatile Queue<StreamsDatum> persistQueue; +/** + * KafkaPersistWriter writes documents to kafka. + */ +public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable { - private ObjectMapper mapper = new ObjectMapper(); + public static final String STREAMS_ID = "KafkaPersistWriter"; - private KafkaConfiguration config; + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class); - private Producer<String, String> producer; + protected volatile Queue<StreamsDatum> persistQueue; - public KafkaPersistWriter() { - this.config = new ComponentConfigurator<>(KafkaConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); - this.persistQueue = new ConcurrentLinkedQueue<>(); - } + private ObjectMapper mapper = new ObjectMapper(); - public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) { - this.config = new ComponentConfigurator<>(KafkaConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); - this.persistQueue = persistQueue; - } + private KafkaConfiguration config; - public void setConfig(KafkaConfiguration config) { - this.config = config; - } + private Producer<String, String> producer; - public void start() { - Properties props = new Properties(); + /** + * KafkaPersistWriter constructor - resolves KafkaConfiguration from JVM 'kafka'. + */ + public KafkaPersistWriter() { + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); + this.persistQueue = new ConcurrentLinkedQueue<>(); + } - props.put("metadata.broker.list", config.getBrokerlist()); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner"); - props.put("request.required.acks", "1"); + /** + * KafkaPersistWriter constructor - uses supplied persistQueue. + */ + public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) { + this.config = new ComponentConfigurator<>(KafkaConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka")); + this.persistQueue = persistQueue; + } - ProducerConfig config = new ProducerConfig(props); + public void setConfig(KafkaConfiguration config) { + this.config = config; + } - producer = new Producer<>(config); + /** + * run persist writer thread + */ + public void start() { + Properties props = new Properties(); - new Thread(new KafkaPersistWriterTask(this)).start(); - } + props.put("metadata.broker.list", config.getBrokerlist()); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner"); + props.put("request.required.acks", "1"); - public void stop() { - producer.close(); - } + ProducerConfig config = new ProducerConfig(props); - public void setPersistQueue(Queue<StreamsDatum> persistQueue) { - this.persistQueue = persistQueue; - } + producer = new Producer<>(config); - public Queue<StreamsDatum> getPersistQueue() { - return this.persistQueue; - } + new Thread(new KafkaPersistWriterTask(this)).start(); + } - @Override - public String getId() { - return STREAMS_ID; - } + public void stop() { + producer.close(); + } - @Override - public void write(StreamsDatum entry) { + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } - try { - String text = mapper.writeValueAsString(entry); + public Queue<StreamsDatum> getPersistQueue() { + return this.persistQueue; + } - String hash = GuidUtils.generateGuid(text); + @Override + public String getId() { + return STREAMS_ID; + } - KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), hash, text); + @Override + public void write(StreamsDatum entry) { - producer.send(data); + try { - } catch (JsonProcessingException e) { - LOGGER.warn("save: {}", e); - }// put - } + String text = mapper.writeValueAsString(entry); - @Override - public void run() { - start(); + String hash = GuidUtils.generateGuid(text); - // stop(); - } + KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), hash, text); - @Override - public void prepare(Object configurationObject) { - start(); - } + producer.send(data); - @Override - public void cleanUp() { - stop(); + } catch (JsonProcessingException ex) { + LOGGER.warn("save: {}", ex); } + } + + @Override + public void run() { + start(); + } + + @Override + public void prepare(Object configurationObject) { + start(); + } + + @Override + public void cleanUp() { + stop(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java index 5d8ee9e..dae7aa2 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java @@ -19,38 +19,45 @@ package org.apache.streams.kafka; import org.apache.streams.core.StreamsDatum; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; +/** + * KafkaPersistWriterTask writes documents to kafka on behalf of + * @see org.apache.streams.kafka.KafkaPersistWriter + */ public class KafkaPersistWriterTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class); - private KafkaPersistWriter writer; + private KafkaPersistWriter writer; - public KafkaPersistWriterTask(KafkaPersistWriter writer) { - this.writer = writer; - } + public KafkaPersistWriterTask(KafkaPersistWriter writer) { + this.writer = writer; + } - @Override - public void run() { - - while(true) { - if( writer.getPersistQueue().peek() != null ) { - try { - StreamsDatum entry = writer.persistQueue.remove(); - writer.write(entry); - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(new Random().nextInt(100)); - } catch (InterruptedException e) {} - } + @Override + public void run() { + while (true) { + if ( writer.getPersistQueue().peek() != null ) { + try { + StreamsDatum entry = writer.persistQueue.remove(); + writer.write(entry); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + try { + Thread.sleep(new Random().nextInt(100)); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupt", interrupt); + } } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java deleted file mode 100644 index ebfff9a..0000000 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.kafka; - -/** - * Created by sblackmon on 12/15/13. - */ -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; - -public class StreamsPartitioner implements Partitioner<String> { - public StreamsPartitioner (VerifiableProperties props) { - - } - - public int partition(String key, int a_numPartitions) { - int partition = 0; - int offset = key.lastIndexOf('.'); - if (offset > 0) { - partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions; - } - return partition; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java index ba77ff1..b6a7404 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java @@ -18,6 +18,14 @@ package org.apache.streams.mongo; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; @@ -30,13 +38,7 @@ import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.jackson.StreamsJacksonMapper; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,230 +54,248 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +/** + * MongoPersistReader reads documents from mongo. + */ public class MongoPersistReader implements StreamsPersistReader { - public static final String STREAMS_ID = "MongoPersistReader"; + public static final String STREAMS_ID = "MongoPersistReader"; - private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class); - protected volatile Queue<StreamsDatum> persistQueue; + protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); - private ExecutorService executor; + private ExecutorService executor; - private MongoConfiguration config; + private MongoConfiguration config; - protected MongoClient client; - protected DB db; - protected DBCollection collection; + protected MongoClient client; + protected DB db; + protected DBCollection collection; - protected DBCursor cursor; + protected DBCursor cursor; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - public MongoPersistReader() { - this.config = new ComponentConfigurator<>(MongoConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")); - } + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - public MongoPersistReader(MongoConfiguration config) { - this.config = config; - } - - public MongoPersistReader(Queue<StreamsDatum> persistQueue) { - this.config = new ComponentConfigurator<>(MongoConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")); - this.persistQueue = persistQueue; - } + /** + * KafkaPersistReader constructor - resolves KafkaConfiguration from JVM 'mongo'. + */ + public MongoPersistReader() { + this.config = new ComponentConfigurator<>(MongoConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")); + } - public void setPersistQueue(Queue<StreamsDatum> persistQueue) { - this.persistQueue = persistQueue; - } + /** + * KafkaPersistReader constructor - uses supplied MongoConfiguration. + * @param config config + */ + public MongoPersistReader(MongoConfiguration config) { + this.config = config; + } - public Queue<StreamsDatum> getPersistQueue() { - return persistQueue; - } + /** + * KafkaPersistReader constructor - uses supplied persistQueue. + * @param persistQueue persistQueue + */ + public MongoPersistReader(Queue<StreamsDatum> persistQueue) { + this.config = new ComponentConfigurator<>(MongoConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")); + this.persistQueue = persistQueue; + } - public void stop() { - } + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } - @Override - public String getId() { - return STREAMS_ID; - } + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } - @Override - public void prepare(Object configurationObject) { + public void stop() { + } - connectToMongo(); + @Override + public String getId() { + return STREAMS_ID; + } - if( client == null || - collection == null ) - throw new RuntimeException("Unable to connect!"); + @Override + public void prepare(Object configurationObject) { - cursor = collection.find(); + connectToMongo(); - if( cursor == null || !cursor.hasNext()) - throw new RuntimeException("Collection not present or empty!"); + if ( client == null + || collection == null ) { + throw new RuntimeException("Unable to connect!"); + } + cursor = collection.find(); - persistQueue = constructQueue(); + if ( cursor == null + || !cursor.hasNext()) { + throw new RuntimeException("Collection not present or empty!"); + } - executor = Executors.newSingleThreadExecutor(); + persistQueue = constructQueue(); - } + executor = Executors.newSingleThreadExecutor(); - @Override - public void cleanUp() { - stop(); - } + } - protected StreamsDatum prepareDatum(DBObject dbObject) { + @Override + public void cleanUp() { + stop(); + } - ObjectNode objectNode; - String id; + protected StreamsDatum prepareDatum(DBObject dbObject) { - try { - objectNode = mapper.readValue(dbObject.toString(), ObjectNode.class); - id = objectNode.get("_id").get("$oid").asText(); - objectNode.remove("_id"); - } catch (IOException e) { - LOGGER.warn("document isn't valid JSON."); - return null; - } + ObjectNode objectNode; + String id; - return new StreamsDatum(objectNode, id); + try { + objectNode = mapper.readValue(dbObject.toString(), ObjectNode.class); + id = objectNode.get("_id").get("$oid").asText(); + objectNode.remove("_id"); + } catch (IOException ex) { + LOGGER.warn("document isn't valid JSON."); + return null; } - private synchronized void connectToMongo() { + return new StreamsDatum(objectNode, id); + } - ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue()); + private synchronized void connectToMongo() { - if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { - MongoCredential credential = - MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); - client = new MongoClient(serverAddress, Lists.newArrayList(credential)); - } else { - client = new MongoClient(serverAddress); - } + ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue()); - db = client.getDB(config.getDb()); + if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { + MongoCredential credential = + MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); + client = new MongoClient(serverAddress, Lists.newArrayList(credential)); + } else { + client = new MongoClient(serverAddress); + } - if (!db.collectionExists(config.getCollection())) { - db.createCollection(config.getCollection(), null); - } + db = client.getDB(config.getDb()); - collection = db.getCollection(config.getCollection()); + if (!db.collectionExists(config.getCollection())) { + db.createCollection(config.getCollection(), null); } - @Override - public StreamsResultSet readAll() { - - try (DBCursor cursor = collection.find()) { - while (cursor.hasNext()) { - DBObject dbObject = cursor.next(); - StreamsDatum datum = prepareDatum(dbObject); - write(datum); - } - } + collection = db.getCollection(config.getCollection()); + } - return readCurrent(); - } + @Override + public StreamsResultSet readAll() { - @Override - public void startStream() { + try (DBCursor cursor = collection.find()) { + while (cursor.hasNext()) { + DBObject dbObject = cursor.next(); + StreamsDatum datum = prepareDatum(dbObject); + write(datum); + } + } - LOGGER.debug("startStream"); - MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this); - Thread readerTaskThread = new Thread(readerTask); - Future future = executor.submit(readerTaskThread); + return readCurrent(); + } - while( !future.isDone() && !future.isCancelled()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignored) {} - } + @Override + public void startStream() { - executor.shutdown(); + LOGGER.debug("startStream"); + MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this); + Thread readerTaskThread = new Thread(readerTask); + Future future = executor.submit(readerTaskThread); + while ( !future.isDone() && !future.isCancelled()) { + try { + Thread.sleep(1000); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupt", interrupt); + } } - @Override - public StreamsResultSet readCurrent() { + executor.shutdown(); - StreamsResultSet current; + } - try { - lock.writeLock().lock(); - current = new StreamsResultSet(persistQueue); - current.setCounter(new DatumStatusCounter()); - persistQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } + @Override + public StreamsResultSet readCurrent() { - return current; - } + StreamsResultSet current; - //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue - //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference - //if the queue is being replaced with a new instance - protected void write(StreamsDatum entry) { - boolean success; - do { - try { - lock.readLock().lock(); - success = persistQueue.offer(entry); - Thread.yield(); - } finally { - lock.readLock().unlock(); - } - } - while (!success); + try { + lock.writeLock().lock(); + current = new StreamsResultSet(persistQueue); + current.setCounter(new DatumStatusCounter()); + persistQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); } - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; + return current; + } + + //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue + //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference + //if the queue is being replaced with a new instance + protected void write(StreamsDatum entry) { + boolean success; + do { + try { + lock.readLock().lock(); + success = persistQueue.offer(entry); + Thread.yield(); + } finally { + lock.readLock().unlock(); + } } + while (!success); + } - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } - @Override - public boolean isRunning() { - return !executor.isTerminated() || !executor.isShutdown(); - } + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } - private Queue<StreamsDatum> constructQueue() { - return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); - } + @Override + public boolean isRunning() { + return !executor.isTerminated() || !executor.isShutdown(); + } - public class MongoPersistReaderTask implements Runnable { + private Queue<StreamsDatum> constructQueue() { + return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); + } - private MongoPersistReader reader; + public class MongoPersistReaderTask implements Runnable { - public MongoPersistReaderTask(MongoPersistReader reader) { - this.reader = reader; - } + private MongoPersistReader reader; - @Override - public void run() { + public MongoPersistReaderTask(MongoPersistReader reader) { + this.reader = reader; + } - try { - while(reader.cursor.hasNext()) { - DBObject dbObject = reader.cursor.next(); - StreamsDatum datum = reader.prepareDatum(dbObject); - reader.write(datum); - } - } finally { - reader.cursor.close(); - } + @Override + public void run() { + try { + while (reader.cursor.hasNext()) { + DBObject dbObject = reader.cursor.next(); + StreamsDatum datum = reader.prepareDatum(dbObject); + reader.write(datum); } + } finally { + reader.cursor.close(); + } } + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java index 5f6ac1f..6072f58 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java @@ -18,6 +18,12 @@ package org.apache.streams.mongo; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; @@ -29,14 +35,12 @@ import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.util.JSON; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.jackson.StreamsJacksonMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.Flushable; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -50,209 +54,217 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class MongoPersistWriter implements StreamsPersistWriter, Runnable { +public class MongoPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable { - public final static String STREAMS_ID = "MongoPersistWriter"; + public static final String STREAMS_ID = "MongoPersistWriter"; - private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriter.class); - private final static long MAX_WRITE_LATENCY = 1000; + private static final long MAX_WRITE_LATENCY = 1000; - protected volatile Queue<StreamsDatum> persistQueue; + protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); - private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); + private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); - private MongoConfiguration config; + private MongoConfiguration config; - protected MongoClient client; - protected DB db; - protected DBCollection collection; + protected MongoClient client; + protected DB db; + protected DBCollection collection; - protected List<DBObject> insertBatch = new ArrayList<>(); + protected List<DBObject> insertBatch = new ArrayList<>(); - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - public MongoPersistWriter() { - this(new ComponentConfigurator<>(MongoConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"))); - } - - public MongoPersistWriter(MongoConfiguration config) { - this.config = config; - } + public MongoPersistWriter() { + this(new ComponentConfigurator<>(MongoConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"))); + } - public void setPersistQueue(Queue<StreamsDatum> persistQueue) { - this.persistQueue = persistQueue; - } - - public Queue<StreamsDatum> getPersistQueue() { - return persistQueue; - } + public MongoPersistWriter(MongoConfiguration config) { + this.config = config; + } - @Override - public String getId() { - return STREAMS_ID; - } + public void setPersistQueue(Queue<StreamsDatum> persistQueue) { + this.persistQueue = persistQueue; + } - @Override - public void write(StreamsDatum streamsDatum) { + public Queue<StreamsDatum> getPersistQueue() { + return persistQueue; + } - DBObject dbObject = prepareObject(streamsDatum); - if (dbObject != null) { - addToBatch(dbObject); - flushIfNecessary(); - } - } + @Override + public String getId() { + return STREAMS_ID; + } - public void flush() throws IOException { - try { - LOGGER.debug("Attempting to flush {} items to mongo", insertBatch.size()); - lock.writeLock().lock(); - collection.insert(insertBatch); - lastWrite.set(System.currentTimeMillis()); - insertBatch = new ArrayList<>(); - } finally { - lock.writeLock().unlock(); - } + @Override + public void write(StreamsDatum streamsDatum) { + DBObject dbObject = prepareObject(streamsDatum); + if (dbObject != null) { + addToBatch(dbObject); + flushIfNecessary(); } - - public synchronized void close() throws IOException { -// client.cleanCursors(true); -// backgroundFlushTask.shutdownNow(); + } + + @Override + public void flush() throws IOException { + try { + LOGGER.debug("Attempting to flush {} items to mongo", insertBatch.size()); + lock.writeLock().lock(); + collection.insert(insertBatch); + lastWrite.set(System.currentTimeMillis()); + insertBatch = new ArrayList<>(); + } finally { + lock.writeLock().unlock(); } - public void start() { - connectToMongo(); - backgroundFlushTask.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - flushIfNecessary(); - } - }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS); + } + + public synchronized void close() throws IOException { + client.close(); + backgroundFlushTask.shutdownNow(); + } + + /** + * start write thread. + */ + public void start() { + connectToMongo(); + backgroundFlushTask.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + flushIfNecessary(); + } + }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS); + } + + /** + * stop. + */ + public void stop() { + + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error flushing", ex); } - - public void stop() { - - try { - flush(); - } catch (IOException e) { - LOGGER.error("Error flushing", e); - } - try { - close(); - } catch (IOException e) { - LOGGER.error("Error closing", e); - } - try { - backgroundFlushTask.shutdown(); - // Wait a while for existing tasks to terminate - if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { - backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { - LOGGER.error("Stream did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - backgroundFlushTask.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - + try { + close(); + } catch (IOException ex) { + LOGGER.error("Error closing", ex); } - - @Override - public void run() { - - while (true) { - if (persistQueue.peek() != null) { - try { - StreamsDatum entry = persistQueue.remove(); - write(entry); - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(new Random().nextInt(1)); - } catch (InterruptedException ignored) { - } + try { + backgroundFlushTask.shutdown(); + // Wait a while for existing tasks to terminate + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) { + LOGGER.error("Stream did not terminate"); } - - } - - @Override - public void prepare(Object configurationObject) { - this.persistQueue = new ConcurrentLinkedQueue<>(); - start(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + backgroundFlushTask.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } - @Override - public void cleanUp() { - stop(); - } + } - protected void flushIfNecessary() { - long lastLatency = System.currentTimeMillis() - lastWrite.get(); - //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater - //than the maximum desired latency - if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) { - try { - flush(); - } catch (IOException e) { - LOGGER.error("Error writing to Mongo", e); - } - } - } + @Override + public void run() { - protected void addToBatch(DBObject dbObject) { + while (true) { + if (persistQueue.peek() != null) { try { - lock.readLock().lock(); - insertBatch.add(dbObject); - } finally { - lock.readLock().unlock(); + StreamsDatum entry = persistQueue.remove(); + write(entry); + } catch (Exception ex) { + ex.printStackTrace(); } + } + try { + Thread.sleep(new Random().nextInt(1)); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupt", interrupt); + } } - protected DBObject prepareObject(StreamsDatum streamsDatum) { - DBObject dbObject = null; - if (streamsDatum.getDocument() instanceof String) { - dbObject = (DBObject) JSON.parse((String) streamsDatum.getDocument()); - } else { - try { - ObjectNode node = mapper.valueToTree(streamsDatum.getDocument()); - dbObject = (DBObject) JSON.parse(node.toString()); - } catch (Exception e) { - LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), e); - } - } - return dbObject; + } + + @Override + public void prepare(Object configurationObject) { + this.persistQueue = new ConcurrentLinkedQueue<>(); + start(); + } + + @Override + public void cleanUp() { + stop(); + } + + protected void flushIfNecessary() { + long lastLatency = System.currentTimeMillis() - lastWrite.get(); + //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater + //than the maximum desired latency + if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) { + try { + flush(); + } catch (IOException ex) { + LOGGER.error("Error writing to Mongo", ex); + } } + } + + protected void addToBatch(DBObject dbObject) { + try { + lock.readLock().lock(); + insertBatch.add(dbObject); + } finally { + lock.readLock().unlock(); + } + } + + protected DBObject prepareObject(StreamsDatum streamsDatum) { + DBObject dbObject = null; + if (streamsDatum.getDocument() instanceof String) { + dbObject = (DBObject) JSON.parse((String) streamsDatum.getDocument()); + } else { + try { + ObjectNode node = mapper.valueToTree(streamsDatum.getDocument()); + dbObject = (DBObject) JSON.parse(node.toString()); + } catch (Exception ex) { + LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), ex); + } + } + return dbObject; + } - private synchronized void connectToMongo() { - - ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue()); + private synchronized void connectToMongo() { - if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { - MongoCredential credential = - MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); - client = new MongoClient(serverAddress, Lists.newArrayList(credential)); - } else { - client = new MongoClient(serverAddress); - } + ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue()); - db = client.getDB(config.getDb()); + if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { + MongoCredential credential = + MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); + client = new MongoClient(serverAddress, Lists.newArrayList(credential)); + } else { + client = new MongoClient(serverAddress); + } - if (!db.collectionExists(config.getCollection())) { - db.createCollection(config.getCollection(), null); - } + db = client.getDB(config.getDb()); - collection = db.getCollection(config.getCollection()); + if (!db.collectionExists(config.getCollection())) { + db.createCollection(config.getCollection(), null); } + collection = db.getCollection(config.getCollection()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java index 18f5a62..2a2e170 100644 --- a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java +++ b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java @@ -18,12 +18,6 @@ package org.apache.streams.mongo.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsResultSet; @@ -32,6 +26,14 @@ import org.apache.streams.mongo.MongoConfiguration; import org.apache.streams.mongo.MongoPersistReader; import org.apache.streams.mongo.MongoPersistWriter; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -48,64 +50,64 @@ import static org.junit.Assert.assertEquals; */ public class MongoPersistIT { - private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistIT.class); - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - MongoConfiguration testConfiguration; + MongoConfiguration testConfiguration; - int count = 0; + int count = 0; - @Before - public void setup() throws Exception { + @Before + public void setup() throws Exception { - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/MongoPersistIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(MongoConfiguration.class).detectConfiguration(typesafe, "mongo"); + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/MongoPersistIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(MongoConfiguration.class).detectConfiguration(typesafe, "mongo"); - } + } - @Test - public void testMongoPersist() throws Exception { + @Test + public void testMongoPersist() throws Exception { - MongoPersistWriter writer = new MongoPersistWriter(testConfiguration); + MongoPersistWriter writer = new MongoPersistWriter(testConfiguration); - writer.prepare(null); + writer.prepare(null); - InputStream testActivityFolderStream = MongoPersistIT.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + InputStream testActivityFolderStream = MongoPersistIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = MongoPersistIT.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - activity.getAdditionalProperties().remove("$license"); - StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); - writer.write( datum ); - LOGGER.info("Wrote: " + activity.getVerb() ); - count++; - } + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = MongoPersistIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + activity.getAdditionalProperties().remove("$license"); + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + writer.write( datum ); + LOGGER.info("Wrote: " + activity.getVerb() ); + count++; + } - LOGGER.info("Total Written: {}", count ); + LOGGER.info("Total Written: {}", count ); - assertEquals( 89, count ); + assertEquals( 89, count ); - writer.cleanUp(); + writer.cleanUp(); - MongoPersistReader reader = new MongoPersistReader(testConfiguration); + MongoPersistReader reader = new MongoPersistReader(testConfiguration); - reader.prepare(null); + reader.prepare(null); - StreamsResultSet resultSet = reader.readAll(); + StreamsResultSet resultSet = reader.readAll(); - LOGGER.info("Total Read: {}", resultSet.size() ); + LOGGER.info("Total Read: {}", resultSet.size() ); - assertEquals( 89, resultSet.size() ); + assertEquals( 89, resultSet.size() ); - } + } }
