Author: cschneider
Date: Tue Jun  7 16:17:57 2011
New Revision: 1133075

URL: http://svn.apache.org/viewvc?rev=1133075&view=rev
Log:
CAMEL-4067 Adding closes for streams and improving the test so it shows the 
cause better

Modified:
    
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
    
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
    
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java

Modified: 
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java?rev=1133075&r1=1133074&r2=1133075&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
 (original)
+++ 
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
 Tue Jun  7 16:17:57 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -55,11 +56,20 @@ public enum HdfsFileType {
     NORMAL_FILE {
         @Override
         public long append(HdfsOutputStream hdfsostr, Object key, Object 
value, TypeConverter typeConverter) {
+               InputStream is = null;
             try {
-                InputStream is = typeConverter.convertTo(InputStream.class, 
value);
+                is = typeConverter.convertTo(InputStream.class, value);
                 return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(), 
HdfsConstants.DEFAULT_BUFFERSIZE, false);
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
+            } finally {
+               if (is != null) {
+                       try {
+                                               is.close();
+                                       } catch (IOException e) {
+                                               throw new 
RuntimeException("Error closing stream", e);
+                                       }
+               }
             }
         }
 
@@ -130,8 +140,9 @@ public enum HdfsFileType {
                 Writable keyWritable = getWritable(key, typeConverter, 
keySize);
                 Holder<Integer> valueSize = new Holder<Integer>();
                 Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
-                ((SequenceFile.Writer) hdfsostr.getOut()).append(keyWritable, 
valueWritable);
-                ((SequenceFile.Writer) hdfsostr.getOut()).sync();
+                Writer writer = (SequenceFile.Writer) hdfsostr.getOut();
+                writer.append(keyWritable, valueWritable);
+                writer.sync();
                 return keySize.value + valueSize.value;
             } catch (Exception ex) {
                 throw new RuntimeCamelException(ex);

Modified: 
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java?rev=1133075&r1=1133074&r2=1133075&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
 (original)
+++ 
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
 Tue Jun  7 16:17:57 2011
@@ -282,8 +282,9 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, 
Holder<Integer> size) {
+               InputStream  is = null;
             try {
-                InputStream is = typeConverter.convertTo(InputStream.class, 
value);
+                is = typeConverter.convertTo(InputStream.class, value);
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, 
false);
                 BytesWritable writable = new BytesWritable();
@@ -292,6 +293,14 @@ public class HdfsWritableFactories {
                 return writable;
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
+            } finally {
+               if (is != null) {
+                       try {
+                                               is.close();
+                                       } catch (IOException e) {
+                                               throw new 
RuntimeException("Error closing stream", e);
+                                       }
+               }
             }
         }
 

Modified: 
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java?rev=1133075&r1=1133074&r2=1133075&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
 (original)
+++ 
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
 Tue Jun  7 16:17:57 2011
@@ -61,7 +61,7 @@ public class HdfsProducerFileWriteTest e
             template.sendBodyAndHeader("file://target/file-batch1/", "CIAO", 
"CamelFileName", "CIAO" + i);
         }
 
-        nb.matchesMockWaitTime();
+        Assert.assertTrue("Timeout waiting for match" + nb.toString(), 
nb.matchesMockWaitTime());
         context.stop();
 
         InputStream in = null;
@@ -96,7 +96,7 @@ public class HdfsProducerFileWriteTest e
             template.sendBodyAndHeader("file://target/file-batch2", "CIAO", 
"CamelFileName", "CIAO" + i);
         }
 
-        nb.matchesMockWaitTime();
+        Assert.assertTrue("Timeout waiting for match" + nb.toString(), 
nb.matchesMockWaitTime());
         context.stop();
 
         Configuration conf = new Configuration();
@@ -136,7 +136,7 @@ public class HdfsProducerFileWriteTest e
             template.sendBodyAndHeader("file://target/file-batch3", "CIAO", 
"CamelFileName", "CIAO" + i);
         }
 
-        nb.matchesMockWaitTime();
+        Assert.assertTrue("Timeout waiting for match" + nb.toString(), 
nb.matchesMockWaitTime());
         context.stop();
 
         Configuration conf = new Configuration();
@@ -177,7 +177,7 @@ public class HdfsProducerFileWriteTest e
             template.sendBodyAndHeader("file://target/file-batch4", "CIAO" + 
i, "CamelFileName", "CIAO" + i);
         }
 
-        nb.matchesMockWaitTime();
+        Assert.assertTrue("Timeout waiting for match" + nb.toString(), 
nb.matchesMockWaitTime());
         context.stop();
 
         Configuration conf = new Configuration();
@@ -220,7 +220,7 @@ public class HdfsProducerFileWriteTest e
             template.sendBodyAndHeader("file://target/file-batch5", bb, 
"CamelFileName", "CIAO" + i);
         }
 
-        nb.matchesMockWaitTime();
+        Assert.assertTrue("Timeout waiting for match" + nb.toString(), 
nb.matchesMockWaitTime());
         context.stop();
 
         Configuration conf = new Configuration();


Reply via email to