Repository: incubator-streams Updated Branches: refs/heads/master 11adec39a -> 6c4cf0f1c
STREAMS-243 | Added better exception handling for the S3PersistWriter and made the shutdown conditions in the LocalStreamBuilder more robust Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6aba90b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6aba90b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6aba90b9 Branch: refs/heads/master Commit: 6aba90b90d55cc19c1f70d0839e9311d5e8356ad Parents: 8517eed Author: Robert Douglas <[email protected]> Authored: Mon Dec 8 17:55:16 2014 -0600 Committer: Robert Douglas <[email protected]> Committed: Mon Dec 8 17:55:16 2014 -0600 ---------------------------------------------------------------------- .../org/apache/streams/s3/S3PersistWriter.java | 37 +++++++----- .../apache/streams/s3/S3PersistWriterTest.java | 62 ++++++++++++++++++++ .../local/builders/LocalStreamBuilder.java | 17 +++--- .../streams/local/builders/StreamComponent.java | 13 ++++ 4 files changed, 107 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index 9111265..c50c94c 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -27,6 +27,7 @@ import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.streams.core.*; import org.slf4j.Logger; @@ -249,26 +250,32 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab // Connect to S3 synchronized (this) { - // if the user has chosen to not set the object mapper, then set a default object mapper for them. - if(this.objectMapper == null) - this.objectMapper = new ObjectMapper(); + try { + // if the user has chosen to not set the object mapper, then set a default object mapper for them. + if (this.objectMapper == null) + this.objectMapper = new ObjectMapper(); - // Create the credentials Object - if(this.amazonS3Client == null) { - AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); + // Create the credentials Object + if (this.amazonS3Client == null) { + AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); - ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString())); + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString())); - // We do not want path style access - S3ClientOptions clientOptions = new S3ClientOptions(); - clientOptions.setPathStyleAccess(false); + // We do not want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(false); - this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); - if( !Strings.isNullOrEmpty(s3WriterConfiguration.getRegion())) - this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion()))); - this.amazonS3Client.setS3ClientOptions(clientOptions); + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + if (!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion())) + this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion()))); + this.amazonS3Client.setS3ClientOptions(clientOptions); + } + } catch (Exception e) { + LOGGER.error("Exception while preparing the S3 client: {}", e); } + + Preconditions.checkArgument(this.amazonS3Client != null); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java new file mode 100644 index 0000000..88e19ea --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java @@ -0,0 +1,62 @@ +package org.apache.streams.s3; + +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; + +public class S3PersistWriterTest { + private S3PersistWriter s3PersistWriter; + + @After + public void tearDown() { + s3PersistWriter = null; + } + + @Test(expected=IllegalArgumentException.class) + public void testBadS3Config() { + s3PersistWriter = new S3PersistWriter(getBadConfig()); + + s3PersistWriter.prepare(null); + } + + @Test + public void testGoodS3Config() { + s3PersistWriter = new S3PersistWriter(getGoodConfig()); + + s3PersistWriter.prepare(null); + + assertNotNull(s3PersistWriter.getAmazonS3Client()); + } + + @Test + public void testCleanup() { + s3PersistWriter = new S3PersistWriter(getGoodConfig()); + + s3PersistWriter.prepare(null); + + s3PersistWriter.cleanUp(); + } + + private S3WriterConfiguration getBadConfig() { + S3WriterConfiguration s3WriterConfiguration = new S3WriterConfiguration(); + + s3WriterConfiguration.setWriterPath("bad_path"); + s3WriterConfiguration.setBucket("random_bucket"); + + return s3WriterConfiguration; + } + + private S3WriterConfiguration getGoodConfig() { + S3WriterConfiguration s3WriterConfiguration = new S3WriterConfiguration(); + + s3WriterConfiguration.setWriterPath("good_path/"); + s3WriterConfiguration.setBucket("random_bucket"); + s3WriterConfiguration.setKey("key"); + s3WriterConfiguration.setProtocol(S3Configuration.Protocol.HTTP); + s3WriterConfiguration.setSecretKey("secret!"); + s3WriterConfiguration.setWriterFilePrefix("prefix"); + + return s3WriterConfiguration; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 7938247..b8ee290 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -22,10 +22,7 @@ import org.apache.streams.core.*; import org.apache.streams.local.counters.StreamsTaskCounter; import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor; import org.apache.streams.local.queues.ThroughputQueue; -import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; -import org.apache.streams.local.tasks.StatusCounterMonitorThread; -import org.apache.streams.local.tasks.StreamsProviderTask; -import org.apache.streams.local.tasks.StreamsTask; +import org.apache.streams.local.tasks.*; import org.apache.streams.monitoring.tasks.BroadcastMonitorThread; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -215,7 +212,13 @@ public class LocalStreamBuilder implements StreamBuilder { isRunning = isRunning || task.isRunning(); } for(StreamComponent task: components.values()) { - isRunning = isRunning || task.getInBoundQueue().size() > 0; + boolean tasksRunning = false; + for(StreamsTask t : task.getStreamsTasks()) { + if(t instanceof BaseStreamsTask) { + tasksRunning = tasksRunning || ((BaseStreamsTask) t).isRunning(); + } + } + isRunning = isRunning || (tasksRunning && task.getInBoundQueue().size() > 0); } if(isRunning) { Thread.sleep(3000); @@ -314,11 +317,11 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.futures.put(task, this.executor.submit(task)); compTasks.add(task); - if( comp.isOperationCountable() ) { + /*if( comp.isOperationCountable() ) { this.monitor.submit(broadcastMonitor); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } + }*/ } streamsTasks.put(comp.getId(), compTasks); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 0dcc4d0..31c5981 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -18,6 +18,7 @@ package org.apache.streams.local.builders; +import com.google.common.collect.Lists; import org.apache.streams.core.*; import org.apache.streams.local.tasks.StreamsPersistWriterTask; import org.apache.streams.local.tasks.StreamsProcessorTask; @@ -51,6 +52,8 @@ public class StreamComponent { private int numTasks = 1; private boolean perpetual; + private List<StreamsTask> tasks; + private Map<String, Object> streamConfig; /** @@ -132,6 +135,7 @@ public class StreamComponent { private void initializePrivateVariables() { this.inBound = new HashSet<StreamComponent>(); this.outBound = new HashMap<StreamComponent, BlockingQueue<StreamsDatum>>(); + this.tasks = Lists.newArrayList(); } /** @@ -240,9 +244,18 @@ public class StreamComponent { else { throw new InvalidStreamException("Underlying StreamComponoent was NULL."); } + + if(task != null) { + tasks.add(task); + } + return task; } + public List<StreamsTask> getStreamsTasks() { + return this.tasks; + } + /** * The unique of this component * @return
