Author: johan
Date: Thu May 20 08:20:35 2010
New Revision: 946564
URL: http://svn.apache.org/viewvc?rev=946564&view=rev
Log:
Close thrift connections properly in the Hadoop record reader. Patch by johan,
review by jbellis. CASSANDRA-1081
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=946564&r1=946563&r2=946564&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Thu May 20 08:20:35 2010
@@ -56,7 +56,11 @@ public class ColumnFamilyRecordReader ex
private String cfName;
private String keyspace;
- public void close() {}
+ public void close()
+ {
+ if (iter != null)
+ iter.close();
+ }
public String getCurrentKey()
{
@@ -102,6 +106,7 @@ public class ColumnFamilyRecordReader ex
private int totalRead = 0;
private int i = 0;
private AbstractType comparator =
DatabaseDescriptor.getComparator(keyspace, cfName);
+ private TSocket socket;
private void maybeInit()
{
@@ -111,7 +116,11 @@ public class ColumnFamilyRecordReader ex
if (rows != null)
return;
- TSocket socket = new TSocket(getLocation(),
+
+ // close previous connection if one is open
+ close();
+
+ socket = new TSocket(getLocation(),
DatabaseDescriptor.getThriftPort());
TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket,
false, false);
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
@@ -226,6 +235,14 @@ public class ColumnFamilyRecordReader ex
}
return new Pair<String, SortedMap<byte[], IColumn>>(ks.key, map);
}
+
+ public void close()
+ {
+ if (socket != null && socket.isOpen())
+ {
+ socket.close();
+ }
+ }
}
private IColumn unthriftify(ColumnOrSuperColumn cosc)