Repository: incubator-streams
Updated Branches:
  refs/heads/master 11adec39a -> 6c4cf0f1c


STREAMS-243 | Added better exception handling for the S3PersistWriter and made 
the shutdown conditions in the LocalStreamBuilder more robust


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6aba90b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6aba90b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6aba90b9

Branch: refs/heads/master
Commit: 6aba90b90d55cc19c1f70d0839e9311d5e8356ad
Parents: 8517eed
Author: Robert Douglas <[email protected]>
Authored: Mon Dec 8 17:55:16 2014 -0600
Committer: Robert Douglas <[email protected]>
Committed: Mon Dec 8 17:55:16 2014 -0600

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3PersistWriter.java  | 37 +++++++-----
 .../apache/streams/s3/S3PersistWriterTest.java  | 62 ++++++++++++++++++++
 .../local/builders/LocalStreamBuilder.java      | 17 +++---
 .../streams/local/builders/StreamComponent.java | 13 ++++
 4 files changed, 107 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 9111265..c50c94c 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -27,6 +27,7 @@ import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.streams.core.*;
 import org.slf4j.Logger;
@@ -249,26 +250,32 @@ public class S3PersistWriter implements 
StreamsPersistWriter, DatumStatusCountab
         // Connect to S3
         synchronized (this) {
 
-            // if the user has chosen to not set the object mapper, then set a 
default object mapper for them.
-            if(this.objectMapper == null)
-                this.objectMapper = new ObjectMapper();
+            try {
+                // if the user has chosen to not set the object mapper, then 
set a default object mapper for them.
+                if (this.objectMapper == null)
+                    this.objectMapper = new ObjectMapper();
 
-            // Create the credentials Object
-            if(this.amazonS3Client == null) {
-                AWSCredentials credentials = new 
BasicAWSCredentials(s3WriterConfiguration.getKey(), 
s3WriterConfiguration.getSecretKey());
+                // Create the credentials Object
+                if (this.amazonS3Client == null) {
+                    AWSCredentials credentials = new 
BasicAWSCredentials(s3WriterConfiguration.getKey(), 
s3WriterConfiguration.getSecretKey());
 
-                ClientConfiguration clientConfig = new ClientConfiguration();
-                
clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
+                    ClientConfiguration clientConfig = new 
ClientConfiguration();
+                    
clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
 
-                // We do not want path style access
-                S3ClientOptions clientOptions = new S3ClientOptions();
-                clientOptions.setPathStyleAccess(false);
+                    // We do not want path style access
+                    S3ClientOptions clientOptions = new S3ClientOptions();
+                    clientOptions.setPathStyleAccess(false);
 
-                this.amazonS3Client = new AmazonS3Client(credentials, 
clientConfig);
-                if( !Strings.isNullOrEmpty(s3WriterConfiguration.getRegion()))
-                    
this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
-                this.amazonS3Client.setS3ClientOptions(clientOptions);
+                    this.amazonS3Client = new AmazonS3Client(credentials, 
clientConfig);
+                    if 
(!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion()))
+                        
this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
+                    this.amazonS3Client.setS3ClientOptions(clientOptions);
+                }
+            } catch (Exception e) {
+                LOGGER.error("Exception while preparing the S3 client: {}", e);
             }
+
+            Preconditions.checkArgument(this.amazonS3Client != null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java
new file mode 100644
index 0000000..88e19ea
--- /dev/null
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/test/java/org/apache/streams/s3/S3PersistWriterTest.java
@@ -0,0 +1,62 @@
+package org.apache.streams.s3;
+
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+public class S3PersistWriterTest {
+    private S3PersistWriter s3PersistWriter;
+
+    @After
+    public void tearDown() {
+        s3PersistWriter = null;
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testBadS3Config() {
+        s3PersistWriter = new S3PersistWriter(getBadConfig());
+
+        s3PersistWriter.prepare(null);
+    }
+
+    @Test
+    public void testGoodS3Config() {
+        s3PersistWriter = new S3PersistWriter(getGoodConfig());
+
+        s3PersistWriter.prepare(null);
+
+        assertNotNull(s3PersistWriter.getAmazonS3Client());
+    }
+
+    @Test
+    public void testCleanup() {
+        s3PersistWriter = new S3PersistWriter(getGoodConfig());
+
+        s3PersistWriter.prepare(null);
+
+        s3PersistWriter.cleanUp();
+    }
+
+    private S3WriterConfiguration getBadConfig() {
+        S3WriterConfiguration s3WriterConfiguration = new 
S3WriterConfiguration();
+
+        s3WriterConfiguration.setWriterPath("bad_path");
+        s3WriterConfiguration.setBucket("random_bucket");
+
+        return s3WriterConfiguration;
+    }
+
+    private S3WriterConfiguration getGoodConfig() {
+        S3WriterConfiguration s3WriterConfiguration = new 
S3WriterConfiguration();
+
+        s3WriterConfiguration.setWriterPath("good_path/");
+        s3WriterConfiguration.setBucket("random_bucket");
+        s3WriterConfiguration.setKey("key");
+        s3WriterConfiguration.setProtocol(S3Configuration.Protocol.HTTP);
+        s3WriterConfiguration.setSecretKey("secret!");
+        s3WriterConfiguration.setWriterFilePrefix("prefix");
+
+        return s3WriterConfiguration;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 7938247..b8ee290 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -22,10 +22,7 @@ import org.apache.streams.core.*;
 import org.apache.streams.local.counters.StreamsTaskCounter;
 import 
org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
 import org.apache.streams.local.queues.ThroughputQueue;
-import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
-import org.apache.streams.local.tasks.StatusCounterMonitorThread;
-import org.apache.streams.local.tasks.StreamsProviderTask;
-import org.apache.streams.local.tasks.StreamsTask;
+import org.apache.streams.local.tasks.*;
 import org.apache.streams.monitoring.tasks.BroadcastMonitorThread;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -215,7 +212,13 @@ public class LocalStreamBuilder implements StreamBuilder {
                     isRunning = isRunning || task.isRunning();
                 }
                 for(StreamComponent task: components.values()) {
-                    isRunning = isRunning || task.getInBoundQueue().size() > 0;
+                    boolean tasksRunning = false;
+                    for(StreamsTask t : task.getStreamsTasks()) {
+                        if(t instanceof BaseStreamsTask) {
+                            tasksRunning = tasksRunning || ((BaseStreamsTask) 
t).isRunning();
+                        }
+                    }
+                    isRunning = isRunning || (tasksRunning && 
task.getInBoundQueue().size() > 0);
                 }
                 if(isRunning) {
                     Thread.sleep(3000);
@@ -314,11 +317,11 @@ public class LocalStreamBuilder implements StreamBuilder {
                 task.setStreamConfig(this.streamConfig);
                 this.futures.put(task, this.executor.submit(task));
                 compTasks.add(task);
-                if( comp.isOperationCountable() ) {
+                /*if( comp.isOperationCountable() ) {
                     this.monitor.submit(broadcastMonitor);
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
-                }
+                }*/
             }
             streamsTasks.put(comp.getId(), compTasks);
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6aba90b9/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 0dcc4d0..31c5981 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.local.builders;
 
+import com.google.common.collect.Lists;
 import org.apache.streams.core.*;
 import org.apache.streams.local.tasks.StreamsPersistWriterTask;
 import org.apache.streams.local.tasks.StreamsProcessorTask;
@@ -51,6 +52,8 @@ public class StreamComponent {
     private int numTasks = 1;
     private boolean perpetual;
 
+    private List<StreamsTask> tasks;
+
     private Map<String, Object> streamConfig;
 
     /**
@@ -132,6 +135,7 @@ public class StreamComponent {
     private void initializePrivateVariables() {
         this.inBound = new HashSet<StreamComponent>();
         this.outBound = new HashMap<StreamComponent, 
BlockingQueue<StreamsDatum>>();
+        this.tasks = Lists.newArrayList();
     }
 
     /**
@@ -240,9 +244,18 @@ public class StreamComponent {
         else {
             throw new InvalidStreamException("Underlying StreamComponoent was 
NULL.");
         }
+
+        if(task != null) {
+            tasks.add(task);
+        }
+
         return task;
     }
 
+    public List<StreamsTask> getStreamsTasks() {
+        return this.tasks;
+    }
+
     /**
      * The unique of this component
      * @return

Reply via email to