http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java index 0a39461..b61a364 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java @@ -16,8 +16,15 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.amazon.kinesis; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.converter.TypeConverterUtil; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; + import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentials; @@ -30,11 +37,7 @@ import com.amazonaws.services.kinesis.model.PutRecordResult; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.typesafe.config.Config; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.converter.TypeConverterUtil; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,87 +51,94 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** - * Created by sblackmon on 9/2/15. + * KinesisPersistWriter writes documents to kinesis. */ public class KinesisPersistWriter implements StreamsPersistWriter { - public final static String STREAMS_ID = "KinesisPersistWriter"; + public static final String STREAMS_ID = "KinesisPersistWriter"; - private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class); - protected volatile Queue<StreamsDatum> persistQueue; + protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper = new ObjectMapper(); + private ObjectMapper mapper = new ObjectMapper(); - private KinesisWriterConfiguration config; + private KinesisWriterConfiguration config; - private List<String> streamName; + private List<String> streamName; - private ExecutorService executor; + private ExecutorService executor; - protected AmazonKinesisClient client; - - public KinesisPersistWriter() { - Config config = StreamsConfigurator.config.getConfig("kinesis"); - this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - } + protected AmazonKinesisClient client; - public KinesisPersistWriter(KinesisWriterConfiguration config) { - this.config = config; - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - } + /** + * KinesisPersistWriter constructor - resolves KinesisWriterConfiguration from JVM 'kinesis'. + */ + public KinesisPersistWriter() { + Config config = StreamsConfigurator.config.getConfig("kinesis"); + this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config); + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + } - public void setConfig(KinesisWriterConfiguration config) { - this.config = config; - } + /** + * KinesisPersistWriter constructor - uses provided KinesisWriterConfiguration. + */ + public KinesisPersistWriter(KinesisWriterConfiguration config) { + this.config = config; + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + } - @Override - public String getId() { - return STREAMS_ID; - } + public void setConfig(KinesisWriterConfiguration config) { + this.config = config; + } - @Override - public void write(StreamsDatum entry) { + @Override + public String getId() { + return STREAMS_ID; + } - String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class); + @Override + public void write(StreamsDatum entry) { - PutRecordRequest putRecordRequest = new PutRecordRequest() - .withStreamName(config.getStream()) - .withPartitionKey(entry.getId()) - .withData(ByteBuffer.wrap(document.getBytes())); + String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class); - PutRecordResult putRecordResult = client.putRecord(putRecordRequest); + PutRecordRequest putRecordRequest = new PutRecordRequest() + .withStreamName(config.getStream()) + .withPartitionKey(entry.getId()) + .withData(ByteBuffer.wrap(document.getBytes())); - entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber())); + PutRecordResult putRecordResult = client.putRecord(putRecordRequest); - LOGGER.debug("Wrote {}", entry); - } + entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber())); - @Override - public void prepare(Object configurationObject) { - // Connect to Kinesis - synchronized (this) { - // Create the credentials Object - AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey()); + LOGGER.debug("Wrote {}", entry); + } - ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString())); + @Override + public void prepare(Object configurationObject) { + // Connect to Kinesis + synchronized (this) { + // Create the credentials Object + AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey()); - this.client = new AmazonKinesisClient(credentials, clientConfig); - if (!Strings.isNullOrEmpty(config.getRegion())) - this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion()))); - } - executor = Executors.newSingleThreadExecutor(); + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString())); + this.client = new AmazonKinesisClient(credentials, clientConfig); + if (!Strings.isNullOrEmpty(config.getRegion())) { + this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion()))); + } } + executor = Executors.newSingleThreadExecutor(); + + } - @Override - public void cleanUp() { - try { - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.debug("Interrupted! ", e); - } + @Override + public void cleanUp() { + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + LOGGER.debug("Interrupted! ", ex); } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java index c13314d..f34782a 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java @@ -15,10 +15,12 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.s3; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,125 +36,129 @@ import java.io.InputStream; * and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file * this becomes problematic. * + * <p/> * This class operates as a wrapper to fix the aforementioned nuances. * + * <p/> * Reference: * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3 */ -public class S3ObjectInputStreamWrapper extends InputStream -{ - private final static Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class); - - private final S3Object s3Object; - private final S3ObjectInputStream is; - private boolean isClosed = false; - - /** - * Create an input stream safely from - * @param s3Object - */ - public S3ObjectInputStreamWrapper(S3Object s3Object) { - this.s3Object = s3Object; - this.is = this.s3Object.getObjectContent(); - } - - public int hashCode() { - return this.is.hashCode(); - } - - public boolean equals(Object obj) { - return this.is.equals(obj); - } - - public String toString() { - return this.is.toString(); - } - - public int read() throws IOException { - return this.is.read(); - } - - public int read(byte[] b) throws IOException { - return this.is.read(b); - } - - public int read(byte[] b, int off, int len) throws IOException { - return this.is.read(b, off, len); - } - - public long skip(long n) throws IOException { - return this.is.skip(n); - } - - public int available() throws IOException { - return this.is.available(); - } - - public boolean markSupported() { - return this.is.markSupported(); - } - - public synchronized void mark(int readlimit) { - this.is.mark(readlimit); - } - - public synchronized void reset() throws IOException { - this.is.reset(); - } - - public void close() throws IOException { - ensureEverythingIsReleased(); - } +public class S3ObjectInputStreamWrapper extends InputStream { - public void ensureEverythingIsReleased() { - if(this.isClosed) - return; - - - try { - // ensure that the S3 Object is closed properly. - this.s3Object.close(); - } catch(Throwable e) { - LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage()); - } - - - try { - // Abort the stream - this.is.abort(); - } - catch(Throwable e) { - LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage()); - } - - // close the input Stream Safely - closeSafely(this.is); - - // This corrects the issue with Open HTTP connections - closeSafely(this.s3Object); - this.isClosed = true; - } - - private static void closeSafely(Closeable is) { - try { - if(is != null) - is.close(); - } catch(Exception e) { - e.printStackTrace(); - LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", e.getMessage()); - } - } + private static final Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class); + + private final S3Object s3Object; + private final S3ObjectInputStream is; + private boolean isClosed = false; - protected void finalize( ) throws Throwable - { - try { - // If there is an accidental leak where the user did not close, call this on the classes destructor - ensureEverythingIsReleased(); - super.finalize(); - } catch(Exception e) { - // this should never be called, just being very cautious - LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", e.getMessage()); - } + /** + * Create an input stream safely. + * @param s3Object s3Object + */ + public S3ObjectInputStreamWrapper(S3Object s3Object) { + this.s3Object = s3Object; + this.is = this.s3Object.getObjectContent(); + } + + public int hashCode() { + return this.is.hashCode(); + } + + public boolean equals(Object obj) { + return this.is.equals(obj); + } + + public String toString() { + return this.is.toString(); + } + + public int read() throws IOException { + return this.is.read(); + } + + public int read(byte[] byt) throws IOException { + return this.is.read(byt); + } + + public int read(byte[] byt, int off, int len) throws IOException { + return this.is.read(byt, off, len); + } + + public long skip(long skip) throws IOException { + return this.is.skip(skip); + } + + public int available() throws IOException { + return this.is.available(); + } + + public boolean markSupported() { + return this.is.markSupported(); + } + + public synchronized void mark(int readlimit) { + this.is.mark(readlimit); + } + + public synchronized void reset() throws IOException { + this.is.reset(); + } + + public void close() throws IOException { + ensureEverythingIsReleased(); + } + + /** + * ensureEverythingIsReleased as part of close process. + */ + public void ensureEverythingIsReleased() { + if (this.isClosed) { + return; + } + + try { + // ensure that the S3 Object is closed properly. + this.s3Object.close(); + } catch (Throwable ex) { + LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), ex.getMessage()); + } + + + try { + // Abort the stream + this.is.abort(); + } catch (Throwable ex) { + LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), ex.getMessage()); + } + + // close the input Stream Safely + closeSafely(this.is); + + // This corrects the issue with Open HTTP connections + closeSafely(this.s3Object); + this.isClosed = true; + } + + private static void closeSafely(Closeable is) { + try { + if (is != null) { + is.close(); + } + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", ex.getMessage()); + } + } + + protected void finalize() throws Throwable { + try { + // If there is an accidental leak where the user did not close, call this on the classes destructor + ensureEverythingIsReleased(); + super.finalize(); + } catch (Exception ex) { + // this should never be called, just being very cautious + LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", ex.getMessage()); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java index 08fc774..e8ca0c7 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java @@ -15,17 +15,23 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.s3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.Upload; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Map; /** @@ -33,112 +39,109 @@ import java.util.Map; * in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become * is directly controlled by the S3PersistWriter. */ -public class S3OutputStreamWrapper extends OutputStream -{ - private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class); - - private final AmazonS3Client amazonS3Client; - private final String bucketName; - private final String path; - private final String fileName; - private ByteArrayOutputStream outputStream; - private final Map<String, String> metaData; - private boolean isClosed = false; - - /** - * Create an OutputStream Wrapper - * @param amazonS3Client - * The Amazon S3 Client which will be handling the file - * @param bucketName - * The Bucket Name you are wishing to write to. - * @param path - * The path where the object will live - * @param fileName - * The fileName you ware wishing to write. - * @param metaData - * Any meta data that is to be written along with the object - * @throws IOException - * If there is an issue creating the stream, this - */ - public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException { - this.amazonS3Client = amazonS3Client; - this.bucketName = bucketName; - this.path = path; - this.fileName = fileName; - this.metaData = metaData; - this.outputStream = new ByteArrayOutputStream(); +public class S3OutputStreamWrapper extends OutputStream { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class); + + private final AmazonS3Client amazonS3Client; + private final String bucketName; + private final String path; + private final String fileName; + private ByteArrayOutputStream outputStream; + private final Map<String, String> metaData; + private boolean isClosed = false; + + /** + * Create an OutputStream Wrapper + * @param amazonS3Client + * The Amazon S3 Client which will be handling the file + * @param bucketName + * The Bucket Name you are wishing to write to. + * @param path + * The path where the object will live + * @param fileName + * The fileName you ware wishing to write. + * @param metaData + * Any meta data that is to be written along with the object + * @throws IOException + * If there is an issue creating the stream, this + */ + public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException { + this.amazonS3Client = amazonS3Client; + this.bucketName = bucketName; + this.path = path; + this.fileName = fileName; + this.metaData = metaData; + this.outputStream = new ByteArrayOutputStream(); + } + + public void write(int byt) throws IOException { + this.outputStream.write(byt); + } + + public void write(byte[] byt) throws IOException { + this.outputStream.write(byt); + } + + public void write(byte[] byt, int off, int len) throws IOException { + this.outputStream.write(byt, off, len); + } + + public void flush() throws IOException { + this.outputStream.flush(); + } + + /** + * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3. + * @throws IOException + * Exception thrown from the FileOutputStream + */ + public void close() throws IOException { + if (!isClosed) { + try { + this.addFile(); + this.outputStream.close(); + this.outputStream = null; + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.warn("There was an error adding the temporaryFile to S3"); + } finally { + // we are done here. + this.isClosed = true; + } } + } - public void write(int b) throws IOException { - this.outputStream.write(b); - } + private void addFile() throws Exception { - public void write(byte[] b) throws IOException { - this.outputStream.write(b); - } + InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray()); + int contentLength = outputStream.size(); - public void write(byte[] b, int off, int len) throws IOException { - this.outputStream.write(b, off, len); - } + TransferManager transferManager = new TransferManager(amazonS3Client); + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setExpirationTime(DateTime.now().plusDays(365 * 3).toDate()); + metadata.setContentLength(contentLength); - public void flush() throws IOException { - this.outputStream.flush(); - } + metadata.addUserMetadata("writer", "org.apache.streams"); - /** - * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3. - * @throws IOException - * Exception thrown from the FileOutputStream - */ - public void close() throws IOException { - if(!isClosed) - { - try - { - this.addFile(); - this.outputStream.close(); - this.outputStream = null; - } - catch(Exception e) { - e.printStackTrace(); - LOGGER.warn("There was an error adding the temporaryFile to S3"); - } - finally { - // we are done here. - this.isClosed = true; - } - } + for (String s : metaData.keySet()) { + metadata.addUserMetadata(s, metaData.get(s)); } - private void addFile() throws Exception { - - InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray()); - int contentLength = outputStream.size(); + String fileNameToWrite = path + fileName; + Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata); + try { + upload.waitForUploadResult(); - TransferManager transferManager = new TransferManager(amazonS3Client); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setExpirationTime(DateTime.now().plusDays(365*3).toDate()); - metadata.setContentLength(contentLength); - - metadata.addUserMetadata("writer", "org.apache.streams"); - - for(String s : metaData.keySet()) - metadata.addUserMetadata(s, metaData.get(s)); - - String fileNameToWrite = path + fileName; - Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata); - try { - upload.waitForUploadResult(); - - is.close(); - transferManager.shutdownNow(false); - LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName); - } catch (Exception e) { - // No Op - } + is.close(); + transferManager.shutdownNow(false); + LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName); + } catch (Exception ignored) { + LOGGER.trace("Ignoring", ignored); + } - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java index 702df71..753b439 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -15,8 +15,16 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.s3; +import org.apache.streams.converter.LineReadWriteUtil; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; + import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentials; @@ -31,12 +39,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Queues; -import org.apache.streams.converter.LineReadWriteUtil; -import org.apache.streams.core.DatumStatusCountable; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,163 +53,168 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +/** + * S3PersistReader reads documents from s3. + */ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable { - private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class); - public final static String STREAMS_ID = "S3PersistReader"; - protected final static char DELIMITER = '\t'; - - private S3ReaderConfiguration s3ReaderConfiguration; - private AmazonS3Client amazonS3Client; - private ObjectMapper mapper = new ObjectMapper(); - protected LineReadWriteUtil lineReaderUtil; - private Collection<String> files; - private ExecutorService executor; - protected volatile Queue<StreamsDatum> persistQueue; - - protected DatumStatusCounter countersTotal = new DatumStatusCounter(); - protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private Future<?> task; - - public AmazonS3Client getAmazonS3Client() { - return this.amazonS3Client; - } - - public S3ReaderConfiguration getS3ReaderConfiguration() { - return this.s3ReaderConfiguration; - } - - public String getBucketName() { - return this.s3ReaderConfiguration.getBucket(); - } - - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @Override - public boolean isRunning() { - return !task.isDone() && !task.isCancelled(); - } - - public DatumStatusCounter getDatumStatusCounter() { - return countersTotal; + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class); + public static final String STREAMS_ID = "S3PersistReader"; + protected static final char DELIMITER = '\t'; + + private S3ReaderConfiguration s3ReaderConfiguration; + private AmazonS3Client amazonS3Client; + private ObjectMapper mapper = new ObjectMapper(); + protected LineReadWriteUtil lineReaderUtil; + private Collection<String> files; + private ExecutorService executor; + protected volatile Queue<StreamsDatum> persistQueue; + + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private Future<?> task; + + public AmazonS3Client getAmazonS3Client() { + return this.amazonS3Client; + } + + public S3ReaderConfiguration getS3ReaderConfiguration() { + return this.s3ReaderConfiguration; + } + + public String getBucketName() { + return this.s3ReaderConfiguration.getBucket(); + } + + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !task.isDone() && !task.isCancelled(); + } + + public DatumStatusCounter getDatumStatusCounter() { + return countersTotal; + } + + public Collection<String> getFiles() { + return this.files; + } + + public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) { + this.s3ReaderConfiguration = s3ReaderConfiguration; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + + lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration); + // Connect to S3 + synchronized (this) { + // Create the credentials Object + AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString())); + + // We do not want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(false); + + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + if ( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) { + this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion()))); + } + this.amazonS3Client.setS3ClientOptions(clientOptions); } - public Collection<String> getFiles() { - return this.files; - } + final ListObjectsRequest request = new ListObjectsRequest() + .withBucketName(this.s3ReaderConfiguration.getBucket()) + .withPrefix(s3ReaderConfiguration.getReaderPath()) + .withMaxKeys(500); + + + ObjectListing listing = this.amazonS3Client.listObjects(request); + + this.files = new ArrayList<String>(); + + /** + * If you can list files that are in this path, then you must be dealing with a directory + * if you cannot list files that are in this path, then you are most likely dealing with + * a simple file. + */ + boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true : false; + boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? true : false; + + if (hasCommonPrefixes || hasObjectSummaries) { + // Handle the 'directory' use case + do { + if (hasCommonPrefixes) { + for (String file : listing.getCommonPrefixes()) { + this.files.add(file); + } + } else { + for (final S3ObjectSummary objectSummary : listing.getObjectSummaries()) { + this.files.add(objectSummary.getKey()); + } + } - public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) { - this.s3ReaderConfiguration = s3ReaderConfiguration; + // get the next batch. + listing = this.amazonS3Client.listNextBatchOfObjects(listing); + } + while (listing.isTruncated()); + } else { + // handle the single file use-case + this.files.add(s3ReaderConfiguration.getReaderPath()); } - @Override - public String getId() { - return STREAMS_ID; + if (this.files.size() <= 0) { + LOGGER.error("There are no files to read"); } - public void prepare(Object configurationObject) { - - lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration); - // Connect to S3 - synchronized (this) - { - // Create the credentials Object - AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey()); - - ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString())); - - // We do not want path style access - S3ClientOptions clientOptions = new S3ClientOptions(); - clientOptions.setPathStyleAccess(false); + this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); + this.executor = Executors.newSingleThreadExecutor(); + } - this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); - if( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) - this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion()))); - this.amazonS3Client.setS3ClientOptions(clientOptions); - } - - final ListObjectsRequest request = new ListObjectsRequest() - .withBucketName(this.s3ReaderConfiguration.getBucket()) - .withPrefix(s3ReaderConfiguration.getReaderPath()) - .withMaxKeys(500); - - - ObjectListing listing = this.amazonS3Client.listObjects(request); - - this.files = new ArrayList<String>(); - - /** - * If you can list files that are in this path, then you must be dealing with a directory - * if you cannot list files that are in this path, then you are most likely dealing with - * a simple file. - */ - boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true : false; - boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? true : false; - - if(hasCommonPrefixes || hasObjectSummaries) { - // Handle the 'directory' use case - do - { - if(hasCommonPrefixes) { - for (String file : listing.getCommonPrefixes()) { - this.files.add(file); - } - } else { - for(final S3ObjectSummary objectSummary : listing.getObjectSummaries()) { - this.files.add(objectSummary.getKey()); - } - } - - // get the next batch. - listing = this.amazonS3Client.listNextBatchOfObjects(listing); - } while (listing.isTruncated()); - } - else { - // handle the single file use-case - this.files.add(s3ReaderConfiguration.getReaderPath()); - } - - if(this.files.size() <= 0) - LOGGER.error("There are no files to read"); - - this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); - this.executor = Executors.newSingleThreadExecutor(); - } - - public void cleanUp() { - // no Op - } + public void cleanUp() { + // no Op + } - public StreamsResultSet readAll() { - startStream(); - return new StreamsResultSet(persistQueue); - } + public StreamsResultSet readAll() { + startStream(); + return new StreamsResultSet(persistQueue); + } - public void startStream() { - LOGGER.debug("startStream"); - task = executor.submit(new S3PersistReaderTask(this)); - } + public void startStream() { + LOGGER.debug("startStream"); + task = executor.submit(new S3PersistReaderTask(this)); + } - public StreamsResultSet readCurrent() { + @Override + public StreamsResultSet readCurrent() { - StreamsResultSet current; + StreamsResultSet current; - synchronized( S3PersistReader.class ) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - persistQueue.clear(); - } - return current; + synchronized ( S3PersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + persistQueue.clear(); } + return current; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java index f2f5567..f0e9626 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -15,12 +15,15 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.s3; -import com.google.common.base.Strings; import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; import org.apache.streams.util.ComponentUtils; + +import com.google.common.base.Strings; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,57 +31,61 @@ import java.io.BufferedReader; import java.io.Closeable; import java.io.InputStreamReader; +/** + * S3PersistReaderTask reads documents from s3 on behalf of + * @see org.apache.streams.s3.S3PersistReader + */ public class S3PersistReaderTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class); - private S3PersistReader reader; + private S3PersistReader reader; - public S3PersistReaderTask(S3PersistReader reader) { - this.reader = reader; - } + public S3PersistReaderTask(S3PersistReader reader) { + this.reader = reader; + } - @Override - public void run() { - - for(String file : reader.getFiles()) { - - // Create our buffered reader - S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file)); - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); - LOGGER.info("Reading: {} ", file); - - String line = ""; - try { - while((line = bufferedReader.readLine()) != null) { - if( !Strings.isNullOrEmpty(line) ) { - reader.countersCurrent.incrementAttempt(); - StreamsDatum entry = reader.lineReaderUtil.processLine(line); - ComponentUtils.offerUntilSuccess(entry, reader.persistQueue); - reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); - } - } - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn(e.getMessage()); - reader.countersCurrent.incrementStatus(DatumStatus.FAIL); - } - - LOGGER.info("Completed: " + file); - - try { - closeSafely(file, is); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } + @Override + public void run() { + + for (String file : reader.getFiles()) { + + // Create our buffered reader + S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file)); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); + LOGGER.info("Reading: {} ", file); + + String line = ""; + try { + while ((line = bufferedReader.readLine()) != null) { + if ( !Strings.isNullOrEmpty(line) ) { + reader.countersCurrent.incrementAttempt(); + StreamsDatum entry = reader.lineReaderUtil.processLine(line); + ComponentUtils.offerUntilSuccess(entry, reader.persistQueue); + reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); + } } + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.warn(ex.getMessage()); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); + } + + LOGGER.info("Completed: " + file); + + try { + closeSafely(file, is); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } } + } - private static void closeSafely(String file, Closeable closeable) { - try { - closeable.close(); - } catch(Exception e) { - LOGGER.error("There was an issue closing file: {}", file); - } + private static void closeSafely(String file, Closeable closeable) { + try { + closeable.close(); + } catch (Exception ex) { + LOGGER.error("There was an issue closing file: {}", file); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index 3686f55..ef6e831 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -15,8 +15,19 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.s3; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.converter.LineReadWriteUtil; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentials; @@ -28,15 +39,7 @@ import com.amazonaws.services.s3.S3ClientOptions; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.converter.LineReadWriteUtil; -import org.apache.streams.core.DatumStatus; -import org.apache.streams.core.DatumStatusCountable; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.jackson.StreamsJacksonMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,239 +56,256 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable -{ - public final static String STREAMS_ID = "S3PersistWriter"; - - private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class); +/** + * S3PersistWriter writes documents to s3. + */ +public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable { - private final static char DELIMITER = '\t'; + public static final String STREAMS_ID = "S3PersistWriter"; - private ObjectMapper objectMapper; - private AmazonS3Client amazonS3Client; - private S3WriterConfiguration s3WriterConfiguration; - private final List<String> writtenFiles = new ArrayList<String>(); - protected LineReadWriteUtil lineWriterUtil; + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class); - private final AtomicLong totalBytesWritten = new AtomicLong(); - private AtomicLong bytesWrittenThisFile = new AtomicLong(); + private static final char DELIMITER = '\t'; - private final AtomicInteger totalRecordsWritten = new AtomicInteger(); - private AtomicInteger fileLineCounter = new AtomicInteger(); + private ObjectMapper objectMapper; + private AmazonS3Client amazonS3Client; + private S3WriterConfiguration s3WriterConfiguration; + private final List<String> writtenFiles = new ArrayList<String>(); + protected LineReadWriteUtil lineWriterUtil; - private Map<String, String> objectMetaData = new HashMap<String, String>() {{ - put("line[0]", "id"); - put("line[1]", "timeStamp"); - put("line[2]", "metaData"); - put("line[3]", "document"); - }}; + private final AtomicLong totalBytesWritten = new AtomicLong(); + private AtomicLong bytesWrittenThisFile = new AtomicLong(); - private OutputStreamWriter currentWriter = null; + private final AtomicInteger totalRecordsWritten = new AtomicInteger(); + private AtomicInteger fileLineCounter = new AtomicInteger(); - public AmazonS3Client getAmazonS3Client() { - return this.amazonS3Client; - } + private static Map<String, String> objectMetaData = new HashMap<String, String>(); - public S3WriterConfiguration getS3WriterConfiguration() { - return this.s3WriterConfiguration; - } + static { + objectMetaData.put("line[0]", "id"); + objectMetaData.put("line[1]", "timeStamp"); + objectMetaData.put("line[2]", "metaData"); + objectMetaData.put("line[3]", "document"); + } - public List<String> getWrittenFiles() { - return this.writtenFiles; - } + private OutputStreamWriter currentWriter = null; - public Map<String, String> getObjectMetaData() { - return this.objectMetaData; - } + public AmazonS3Client getAmazonS3Client() { + return this.amazonS3Client; + } - public ObjectMapper getObjectMapper() { - return this.objectMapper; - } + public S3WriterConfiguration getS3WriterConfiguration() { + return this.s3WriterConfiguration; + } - public void setObjectMapper(ObjectMapper mapper) { - this.objectMapper = mapper; - } + public List<String> getWrittenFiles() { + return this.writtenFiles; + } - public void setObjectMetaData(Map<String, String> val) { - this.objectMetaData = val; - } + public Map<String, String> getObjectMetaData() { + return this.objectMetaData; + } - public S3PersistWriter() { - this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3"))); - } + public ObjectMapper getObjectMapper() { + return this.objectMapper; + } - public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) { - this.s3WriterConfiguration = s3WriterConfiguration; - } + public void setObjectMapper(ObjectMapper mapper) { + this.objectMapper = mapper; + } - /** - * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use. - * @param amazonS3Client - * If you have an existing amazonS3Client, it wont' bother to create another one - * @param s3WriterConfiguration - * Configuration of the write paths and instructions are still required. - */ - public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) { - this.amazonS3Client = amazonS3Client; - this.s3WriterConfiguration = s3WriterConfiguration; - } + public void setObjectMetaData(Map<String, String> val) { + this.objectMetaData = val; + } - @Override - public String getId() { - return STREAMS_ID; - } + public S3PersistWriter() { + this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3"))); + } - @Override - public void write(StreamsDatum streamsDatum) { - - synchronized (this) { - // Check to see if we need to reset the file that we are currently working with - if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) { - try { - LOGGER.info("Resetting the file"); - this.currentWriter = resetFile(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - String line = lineWriterUtil.convertResultToString(streamsDatum); - - try { - this.currentWriter.write(line); - } catch (IOException e) { - e.printStackTrace(); - } - - // add the bytes we've written - int recordSize = line.getBytes().length; - this.totalBytesWritten.addAndGet(recordSize); - this.bytesWrittenThisFile.addAndGet(recordSize); - - // increment the record count - this.totalRecordsWritten.incrementAndGet(); - this.fileLineCounter.incrementAndGet(); - } + public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) { + this.s3WriterConfiguration = s3WriterConfiguration; + } - } + /** + * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use. + * @param amazonS3Client + * If you have an existing amazonS3Client, it wont' bother to create another one + * @param s3WriterConfiguration + * Configuration of the write paths and instructions are still required. + */ + public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) { + this.amazonS3Client = amazonS3Client; + this.s3WriterConfiguration = s3WriterConfiguration; + } - public synchronized OutputStreamWriter resetFile() throws Exception { - // this will keep it thread safe, so we don't create too many files - if(this.fileLineCounter.get() == 0 && this.currentWriter != null) - return this.currentWriter; + @Override + public String getId() { + return STREAMS_ID; + } - closeAndDestroyWriter(); + @Override + public void write(StreamsDatum streamsDatum) { - // Create the path for where the file is going to live. + synchronized (this) { + // Check to see if we need to reset the file that we are currently working with + if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) { try { - // generate a file name - String fileName = this.s3WriterConfiguration.getWriterFilePrefix() + - (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv"; - - // create the output stream - OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client, - this.s3WriterConfiguration.getBucket(), - this.s3WriterConfiguration.getWriterPath(), - fileName, - this.objectMetaData); - - // reset the counter - this.fileLineCounter = new AtomicInteger(); - this.bytesWrittenThisFile = new AtomicLong(); - - // add this to the list of written files - writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName); - - // Log that we are creating this file - LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName); - - // return the output stream - return new OutputStreamWriter(outputStream); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - throw e; + LOGGER.info("Resetting the file"); + this.currentWriter = resetFile(); + } catch (Exception ex) { + ex.printStackTrace(); } - } + } - private synchronized void closeAndDestroyWriter() { - // if there is a current writer, we must close it first. - if (this.currentWriter != null) { - this.safeFlush(this.currentWriter); - this.closeSafely(this.currentWriter); - this.currentWriter = null; + String line = lineWriterUtil.convertResultToString(streamsDatum); - // Logging of information to alert the user to the activities of this class - LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1)); - } + try { + this.currentWriter.write(line); + } catch (IOException ex) { + ex.printStackTrace(); + } + + // add the bytes we've written + int recordSize = line.getBytes().length; + this.totalBytesWritten.addAndGet(recordSize); + this.bytesWrittenThisFile.addAndGet(recordSize); + + // increment the record count + this.totalRecordsWritten.incrementAndGet(); + this.fileLineCounter.incrementAndGet(); } - private synchronized void closeSafely(Writer writer) { - if(writer != null) { - try { - writer.flush(); - writer.close(); - } catch(Exception e) { - // noOp - } - LOGGER.debug("File Closed"); - } + } + + /** + * Reset File when it's time to create a new file. + * @return OutputStreamWriter + * @throws Exception Exception + */ + public synchronized OutputStreamWriter resetFile() throws Exception { + // this will keep it thread safe, so we don't create too many files + if (this.fileLineCounter.get() == 0 && this.currentWriter != null) { + return this.currentWriter; } - private void safeFlush(Flushable flushable) { - // This is wrapped with a ByteArrayOutputStream, so this is really safe. - if(flushable != null) { - try { - flushable.flush(); - } catch(IOException e) { - // noOp - } - } + closeAndDestroyWriter(); + + // Create the path for where the file is going to live. + try { + // generate a file name + String fileName = this.s3WriterConfiguration.getWriterFilePrefix() + + (this.s3WriterConfiguration.getChunk() ? "/" : "-") + + new Date().getTime() + + ".tsv"; + + // create the output stream + OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client, + this.s3WriterConfiguration.getBucket(), + this.s3WriterConfiguration.getWriterPath(), + fileName, + this.objectMetaData); + + // reset the counter + this.fileLineCounter = new AtomicInteger(); + this.bytesWrittenThisFile = new AtomicLong(); + + // add this to the list of written files + writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName); + + // Log that we are creating this file + LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName); + + // return the output stream + return new OutputStreamWriter(outputStream); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + throw ex; } + } + + private synchronized void closeAndDestroyWriter() { + // if there is a current writer, we must close it first. + if (this.currentWriter != null) { + this.safeFlush(this.currentWriter); + this.closeSafely(this.currentWriter); + this.currentWriter = null; - public void prepare(Object configurationObject) { + // Logging of information to alert the user to the activities of this class + LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size() - 1)); + } + } + + private synchronized void closeSafely(Writer writer) { + if (writer != null) { + try { + writer.flush(); + writer.close(); + } catch (Exception ex) { + LOGGER.trace("closeSafely", ex); + } + LOGGER.debug("File Closed"); + } + } + + private void safeFlush(Flushable flushable) { + // This is wrapped with a ByteArrayOutputStream, so this is really safe. + if (flushable != null) { + try { + flushable.flush(); + } catch (IOException ex) { + LOGGER.trace("safeFlush", ex); + } + } + } - lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration); + @Override + public void prepare(Object configurationObject) { - // Connect to S3 - synchronized (this) { + lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration); - try { - // if the user has chosen to not set the object mapper, then set a default object mapper for them. - if (this.objectMapper == null) - this.objectMapper = StreamsJacksonMapper.getInstance(); + // Connect to S3 + synchronized (this) { - // Create the credentials Object - if (this.amazonS3Client == null) { - AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); + try { + // if the user has chosen to not set the object mapper, then set a default object mapper for them. + if (this.objectMapper == null) { + this.objectMapper = StreamsJacksonMapper.getInstance(); + } - ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString())); + // Create the credentials Object + if (this.amazonS3Client == null) { + AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); - // We do not want path style access - S3ClientOptions clientOptions = new S3ClientOptions(); - clientOptions.setPathStyleAccess(false); + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString())); - this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); - if (!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion())) - this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion()))); - this.amazonS3Client.setS3ClientOptions(clientOptions); - } - } catch (Exception e) { - LOGGER.error("Exception while preparing the S3 client: {}", e); - } + // We do not want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(false); - Preconditions.checkArgument(this.amazonS3Client != null); + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + if (!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion())) { + this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion()))); + } + this.amazonS3Client.setS3ClientOptions(clientOptions); } - } - - public void cleanUp() { - closeAndDestroyWriter(); - } + } catch (Exception ex) { + LOGGER.error("Exception while preparing the S3 client: {}", ex); + } - public DatumStatusCounter getDatumStatusCounter() { - DatumStatusCounter counters = new DatumStatusCounter(); - counters.incrementAttempt(this.totalRecordsWritten.get()); - counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get()); - return counters; + Preconditions.checkArgument(this.amazonS3Client != null); } + } + + public void cleanUp() { + closeAndDestroyWriter(); + } + + @Override + public DatumStatusCounter getDatumStatusCounter() { + DatumStatusCounter counters = new DatumStatusCounter(); + counters.incrementAttempt(this.totalRecordsWritten.get()); + counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get()); + return counters; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java index 8793333..43d9e34 100644 --- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java +++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java @@ -18,102 +18,100 @@ package org.apache.streams.console; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.core.StreamsResultSet; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.InputStream; -import java.io.PrintStream; import java.math.BigInteger; import java.util.Queue; import java.util.Scanner; import java.util.concurrent.ConcurrentLinkedQueue; +/** + * ConsolePersistReader reads documents from stdin. + */ public class ConsolePersistReader implements StreamsPersistReader { - private final static String STREAMS_ID = "ConsolePersistReader"; + private static final String STREAMS_ID = "ConsolePersistReader"; - private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistReader.class); - protected volatile Queue<StreamsDatum> persistQueue; + protected volatile Queue<StreamsDatum> persistQueue; - protected InputStream inputStream = System.in; + protected InputStream inputStream = System.in; - public ConsolePersistReader() { - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - } + public ConsolePersistReader() { + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + } - public ConsolePersistReader(InputStream inputStream) { - this(); - this.inputStream = inputStream; - } + public ConsolePersistReader(InputStream inputStream) { + this(); + this.inputStream = inputStream; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - public void prepare(Object o) { + public void prepare(Object configuration) { - } + } - public void cleanUp() { + public void cleanUp() { - } + } - @Override - public void startStream() { - // no op - } + @Override + public void startStream() { + // no op + } - @Override - public StreamsResultSet readAll() { - return readCurrent(); - } + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } - @Override - public StreamsResultSet readCurrent() { + @Override + public StreamsResultSet readCurrent() { - LOGGER.info("{} readCurrent", STREAMS_ID); + LOGGER.info("{} readCurrent", STREAMS_ID); - Scanner sc = new Scanner(inputStream); + Scanner sc = new Scanner(inputStream); - while( sc.hasNextLine() ) { + while ( sc.hasNextLine() ) { - persistQueue.offer(new StreamsDatum(sc.nextLine())); + persistQueue.offer(new StreamsDatum(sc.nextLine())); - } + } - LOGGER.info("Providing {} docs", persistQueue.size()); + LOGGER.info("Providing {} docs", persistQueue.size()); - StreamsResultSet result = new StreamsResultSet(persistQueue); + StreamsResultSet result = new StreamsResultSet(persistQueue); - LOGGER.info("{} Exiting", STREAMS_ID); + LOGGER.info("{} Exiting", STREAMS_ID); - return result; + return result; - } + } - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return readCurrent(); - } + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return readCurrent(); + } - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return readCurrent(); - } + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return readCurrent(); + } - @Override - public boolean isRunning() { - return true; //Will always be running - } + @Override + public boolean isRunning() { + return true; //Will always be running + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java index 6d284ba..6358071 100644 --- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java +++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java @@ -18,12 +18,14 @@ package org.apache.streams.console; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,53 +33,56 @@ import java.io.PrintStream; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +/** + * ConsolePersistWriter writes documents to stdout. + */ public class ConsolePersistWriter implements StreamsPersistWriter { - private final static String STREAMS_ID = "ConsolePersistWriter"; - - private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class); + private static final String STREAMS_ID = "ConsolePersistWriter"; - protected PrintStream printStream = System.out; + private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class); - protected volatile Queue<StreamsDatum> persistQueue; + protected PrintStream printStream = System.out; - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + protected volatile Queue<StreamsDatum> persistQueue; - public ConsolePersistWriter() { - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - } + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - public ConsolePersistWriter(PrintStream printStream) { - this(); - this.printStream = printStream; - } + public ConsolePersistWriter() { + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + } - @Override - public String getId() { - return STREAMS_ID; - } + public ConsolePersistWriter(PrintStream printStream) { + this(); + this.printStream = printStream; + } - public void prepare(Object o) { - Preconditions.checkNotNull(persistQueue); - } + @Override + public String getId() { + return STREAMS_ID; + } - public void cleanUp() { + public void prepare(Object configuration) { + Preconditions.checkNotNull(persistQueue); + } - } + public void cleanUp() { - @Override - public void write(StreamsDatum entry) { + } - try { + @Override + public void write(StreamsDatum entry) { - String text = mapper.writeValueAsString(entry); + try { - printStream.println(text); + String text = mapper.writeValueAsString(entry); - } catch (JsonProcessingException e) { - LOGGER.warn("save: {}", e); - } + printStream.println(text); + } catch (JsonProcessingException ex) { + LOGGER.warn("save: {}", ex); } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java index e5009f0..ecb60e3 100644 --- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java +++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java @@ -19,36 +19,43 @@ package org.apache.streams.console; import org.apache.streams.core.StreamsDatum; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; +/** + * ConsolePersistWriterTask writes documents to stdout on behalf of + * @see org.apache.streams.console.ConsolePersistWriter + */ public class ConsolePersistWriterTask implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriterTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriterTask.class); - private ConsolePersistWriter writer; + private ConsolePersistWriter writer; - public ConsolePersistWriterTask(ConsolePersistWriter writer) { - this.writer = writer; - } + public ConsolePersistWriterTask(ConsolePersistWriter writer) { + this.writer = writer; + } - @Override - public void run() { - while(true) { - if( writer.persistQueue.peek() != null ) { - try { - StreamsDatum entry = writer.persistQueue.remove(); - writer.write(entry); - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(new Random().nextInt(100)); - } catch (InterruptedException e) {} + @Override + public void run() { + while (true) { + if ( writer.persistQueue.peek() != null ) { + try { + StreamsDatum entry = writer.persistQueue.remove(); + writer.write(entry); + } catch (Exception ex) { + ex.printStackTrace(); } + } + try { + Thread.sleep(new Random().nextInt(100)); + } catch (InterruptedException interrupt) { + LOGGER.trace("Interrupted", interrupt); + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java index 0b2b782..8c7f724 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java @@ -21,21 +21,24 @@ package org.apache.streams.elasticsearch; import org.elasticsearch.Version; import org.elasticsearch.client.Client; +/** + * Wrapper class for a client with a known version. + */ public class ElasticsearchClient { - private Client client; - private Version version; + private Client client; + private Version version; - public ElasticsearchClient(Client client, Version version) { - this.client = client; - this.version = version; - } + public ElasticsearchClient(Client client, Version version) { + this.client = client; + this.version = version; + } - public Client getClient() { - return client; - } + public Client getClient() { + return client; + } - public Version getVersion() { - return version; - } + public Version getVersion() { + return version; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java index 4809334..bdff9aa 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java @@ -19,6 +19,7 @@ package org.apache.streams.elasticsearch; import com.google.common.net.InetAddresses; + import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; @@ -41,157 +42,154 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; +/** + * Wrapper class for multiple + * @see org.apache.streams.elasticsearch.ElasticsearchClient + */ public class ElasticsearchClientManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class); - private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>(); - - private ElasticsearchConfiguration elasticsearchConfiguration; - - public ElasticsearchClientManager(ElasticsearchConfiguration elasticsearchConfiguration) { - this.elasticsearchConfiguration = elasticsearchConfiguration; - } - public ElasticsearchConfiguration getElasticsearchConfiguration() { - return elasticsearchConfiguration; + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class); + private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>(); + + private ElasticsearchConfiguration elasticsearchConfiguration; + + public ElasticsearchClientManager(ElasticsearchConfiguration elasticsearchConfiguration) { + this.elasticsearchConfiguration = elasticsearchConfiguration; + } + + public ElasticsearchConfiguration getElasticsearchConfiguration() { + return elasticsearchConfiguration; + } + + /** + * Get the Client for this return, it is actually a transport client, but it is much + * easier to work with the generic object as this interface likely won't change from + * elasticsearch. This method is synchronized to block threads from creating + * too many of these at any given time. + * + * @return Client for elasticsearch + */ + public Client getClient() { + checkAndLoadClient(null); + + return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient(); + } + + /** + * Returns Client with clusterName. + * @param clusterName clusterName + */ + public Client getClient(String clusterName) { + checkAndLoadClient(clusterName); + + return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient(); + } + + public boolean isOnOrAfterVersion(Version version) { + return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version); + } + + public boolean refresh(String index) { + return refresh(new String[]{index}); + } + + public boolean refresh(String[] indexes) { + RefreshResponse refreshResponse = this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet(); + return refreshResponse.getFailedShards() == 0; + } + + /** + * Terminate the elasticsearch clients. + */ + public synchronized void stop() { + // Check to see if we have a client. + if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) { + // Close the client + ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close(); + + // Remove it so that it isn't in memory any more. + ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName()); } + } - /** - * *********************************************************************************** - * Get the Client for this return, it is actually a transport client, but it is much - * easier to work with the generic object as this interface likely won't change from - * elasticsearch. This method is synchronized to block threads from creating - * too many of these at any given time. - * - * @return Client for elasticsearch - * *********************************************************************************** - */ - public Client getClient() { - checkAndLoadClient(null); - - return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient(); - } + public ClusterHealthResponse getStatus() throws ExecutionException, InterruptedException { + ClusterHealthRequestBuilder request = this.getClient().admin().cluster().prepareHealth(); + return request.execute().get(); + } - public Client getClient(String clusterName) { - checkAndLoadClient(clusterName); + public String toString() { + return ToStringBuilder.reflectionToString(this); + } - return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient(); - } + public boolean equals(Object configuration) { + return EqualsBuilder.reflectionEquals(this, configuration, Collections.singletonList(this.elasticsearchConfiguration.toString())); + } - public boolean isOnOrAfterVersion(Version version) { - return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version); - } + public int hashCode() { + return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString())); + } - public void start() throws Exception { - /* - * Note: - * Everything in these classes is being switched to lazy loading. Within - * Heroku you only have 60 seconds to connect, and bind to the service, - * and you are only allowed to run in 1Gb of memory. Switching all - * of this to lazy loading is how we are fixing some of the issues - * if you are having issues with these classes, please, refactor - * and create a UNIT TEST CASE!!!!!! To ensure that everything is - * working before you check it back in. - * - * Author: Smashew @ 2013-08-26 - **********************************************************************/ - } + private synchronized void checkAndLoadClient(String clusterName) { - public boolean refresh(String index) { - return refresh(new String[]{index}); + if (clusterName == null) { + clusterName = this.elasticsearchConfiguration.getClusterName(); } - public boolean refresh(String[] indexes) { - RefreshResponse refreshResponse = this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet(); - return refreshResponse.getFailedShards() == 0; + // If it is there, exit early + if (ALL_CLIENTS.containsKey(clusterName)) { + return; } - public synchronized void stop() { - // Terminate the elasticsearch cluster - // Check to see if we have a client. - if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) { - // Close the client - ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close(); - - // Remove it so that it isn't in memory any more. - ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName()); + try { + // We are currently using lazy loading to start the elasticsearch cluster, however. + LOGGER.info("Creating a new TransportClient: {}", this.elasticsearchConfiguration.getHosts()); + + Settings settings = Settings.settingsBuilder() + .put("cluster.name", this.elasticsearchConfiguration.getClusterName()) + .put("client.transport.ping_timeout", "90s") + .put("client.transport.nodes_sampler_interval", "60s") + .build(); + + + // Create the client + TransportClient transportClient = TransportClient.builder().settings(settings).build(); + for (String h : elasticsearchConfiguration.getHosts()) { + LOGGER.info("Adding Host: {}", h); + InetAddress address; + + if ( InetAddresses.isInetAddress(h)) { + LOGGER.info("{} is an IP address", h); + address = InetAddresses.forString(h); + } else { + LOGGER.info("{} is a hostname", h); + address = InetAddress.getByName(h); } - } + transportClient.addTransportAddress( + new InetSocketTransportAddress( + address, + elasticsearchConfiguration.getPort().intValue())); + } - public ClusterHealthResponse getStatus() throws ExecutionException, InterruptedException { - ClusterHealthRequestBuilder request = this.getClient().admin().cluster().prepareHealth(); - return request.execute().get(); - } + // Add the client and figure out the version. + ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transportClient, getVersion(transportClient)); - public String toString() { - return ToStringBuilder.reflectionToString(this); - } + // Add it to our static map + ALL_CLIENTS.put(clusterName, elasticsearchClient); - public boolean equals(Object o) { - return EqualsBuilder.reflectionEquals(this, o, Collections.singletonList(this.elasticsearchConfiguration.toString())); + } catch (Exception ex) { + LOGGER.error("Could not Create elasticsearch Transport Client: {}", ex); } - public int hashCode() { - return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString())); - } + } - private synchronized void checkAndLoadClient(String clusterName) { - - if (clusterName == null) - clusterName = this.elasticsearchConfiguration.getClusterName(); - - // If it is there, exit early - if (ALL_CLIENTS.containsKey(clusterName)) - return; - - try { - // We are currently using lazy loading to start the elasticsearch cluster, however. - LOGGER.info("Creating a new TransportClient: {}", this.elasticsearchConfiguration.getHosts()); - - Settings settings = Settings.settingsBuilder() - .put("cluster.name", this.elasticsearchConfiguration.getClusterName()) - .put("client.transport.ping_timeout", "90s") - .put("client.transport.nodes_sampler_interval", "60s") - .build(); - - - // Create the client - TransportClient transportClient = TransportClient.builder().settings(settings).build(); - for (String h : elasticsearchConfiguration.getHosts()) { - LOGGER.info("Adding Host: {}", h); - InetAddress address; - - if( InetAddresses.isInetAddress(h)) { - LOGGER.info("{} is an IP address", h); - address = InetAddresses.forString(h); - } else { - LOGGER.info("{} is a hostname", h); - address = InetAddress.getByName(h); - } - transportClient.addTransportAddress( - new InetSocketTransportAddress( - address, - elasticsearchConfiguration.getPort().intValue())); - } - // Add the client and figure out the version. - ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transportClient, getVersion(transportClient)); - - // Add it to our static map - ALL_CLIENTS.put(clusterName, elasticsearchClient); - - } catch (Exception e) { - LOGGER.error("Could not Create elasticsearch Transport Client: {}", e); - } + private Version getVersion(Client client) { + try { + ClusterStateRequestBuilder clusterStateRequestBuilder = client.admin().cluster().prepareState(); + ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet(); + return clusterStateResponse.getState().getNodes().getMasterNode().getVersion(); + } catch (Exception ex) { + return null; } - - private Version getVersion(Client client) { - try { - ClusterStateRequestBuilder clusterStateRequestBuilder = client.admin().cluster().prepareState(); - ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet(); - - return clusterStateResponse.getState().getNodes().getMasterNode().getVersion(); - } catch (Exception e) { - return null; - } - } + } }