Author: cschneider
Date: Tue Jun 7 16:17:57 2011
New Revision: 1133075
URL: http://svn.apache.org/viewvc?rev=1133075&view=rev
Log:
CAMEL-4067 Adding closes for streams and improving the test so it shows the
cause better
Modified:
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
Modified:
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java?rev=1133075&r1=1133074&r2=1133075&view=diff
==============================================================================
---
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
(original)
+++
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
Tue Jun 7 16:17:57 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -55,11 +56,20 @@ public enum HdfsFileType {
NORMAL_FILE {
@Override
public long append(HdfsOutputStream hdfsostr, Object key, Object
value, TypeConverter typeConverter) {
+ InputStream is = null;
try {
- InputStream is = typeConverter.convertTo(InputStream.class,
value);
+ is = typeConverter.convertTo(InputStream.class, value);
return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(),
HdfsConstants.DEFAULT_BUFFERSIZE, false);
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ throw new
RuntimeException("Error closing stream", e);
+ }
+ }
}
}
@@ -130,8 +140,9 @@ public enum HdfsFileType {
Writable keyWritable = getWritable(key, typeConverter,
keySize);
Holder<Integer> valueSize = new Holder<Integer>();
Writable valueWritable = getWritable(value, typeConverter,
valueSize);
- ((SequenceFile.Writer) hdfsostr.getOut()).append(keyWritable,
valueWritable);
- ((SequenceFile.Writer) hdfsostr.getOut()).sync();
+ Writer writer = (SequenceFile.Writer) hdfsostr.getOut();
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
return keySize.value + valueSize.value;
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
Modified:
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java?rev=1133075&r1=1133074&r2=1133075&view=diff
==============================================================================
---
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
(original)
+++
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
Tue Jun 7 16:17:57 2011
@@ -282,8 +282,9 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter,
Holder<Integer> size) {
+ InputStream is = null;
try {
- InputStream is = typeConverter.convertTo(InputStream.class,
value);
+ is = typeConverter.convertTo(InputStream.class, value);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE,
false);
BytesWritable writable = new BytesWritable();
@@ -292,6 +293,14 @@ public class HdfsWritableFactories {
return writable;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ throw new
RuntimeException("Error closing stream", e);
+ }
+ }
}
}
Modified:
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java?rev=1133075&r1=1133074&r2=1133075&view=diff
==============================================================================
---
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
(original)
+++
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
Tue Jun 7 16:17:57 2011
@@ -61,7 +61,7 @@ public class HdfsProducerFileWriteTest e
template.sendBodyAndHeader("file://target/file-batch1/", "CIAO",
"CamelFileName", "CIAO" + i);
}
- nb.matchesMockWaitTime();
+ Assert.assertTrue("Timeout waiting for match" + nb.toString(),
nb.matchesMockWaitTime());
context.stop();
InputStream in = null;
@@ -96,7 +96,7 @@ public class HdfsProducerFileWriteTest e
template.sendBodyAndHeader("file://target/file-batch2", "CIAO",
"CamelFileName", "CIAO" + i);
}
- nb.matchesMockWaitTime();
+ Assert.assertTrue("Timeout waiting for match" + nb.toString(),
nb.matchesMockWaitTime());
context.stop();
Configuration conf = new Configuration();
@@ -136,7 +136,7 @@ public class HdfsProducerFileWriteTest e
template.sendBodyAndHeader("file://target/file-batch3", "CIAO",
"CamelFileName", "CIAO" + i);
}
- nb.matchesMockWaitTime();
+ Assert.assertTrue("Timeout waiting for match" + nb.toString(),
nb.matchesMockWaitTime());
context.stop();
Configuration conf = new Configuration();
@@ -177,7 +177,7 @@ public class HdfsProducerFileWriteTest e
template.sendBodyAndHeader("file://target/file-batch4", "CIAO" +
i, "CamelFileName", "CIAO" + i);
}
- nb.matchesMockWaitTime();
+ Assert.assertTrue("Timeout waiting for match" + nb.toString(),
nb.matchesMockWaitTime());
context.stop();
Configuration conf = new Configuration();
@@ -220,7 +220,7 @@ public class HdfsProducerFileWriteTest e
template.sendBodyAndHeader("file://target/file-batch5", bb,
"CamelFileName", "CIAO" + i);
}
- nb.matchesMockWaitTime();
+ Assert.assertTrue("Timeout waiting for match" + nb.toString(),
nb.matchesMockWaitTime());
context.stop();
Configuration conf = new Configuration();