Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 2e92cf899 -> 17d43fa55 refs/heads/trunk 7476d83b4 -> 81858ebcb
Reorder operations in CqlRecordWriter main run loop Patch by Philip Thompson; reviewed by Sam Tunnicliffe for CASSANDRA-9576 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/17d43fa5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/17d43fa5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/17d43fa5 Branch: refs/heads/cassandra-2.2 Commit: 17d43fa55eca29be492a716f04d9ceff1989762d Parents: 2e92cf8 Author: Philip Thompson <[email protected]> Authored: Mon Jun 15 11:55:04 2015 -0400 Committer: Sam Tunnicliffe <[email protected]> Committed: Mon Jun 15 19:57:08 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/cql3/CqlRecordWriter.java | 61 ++++++++++---------- 2 files changed, 30 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/17d43fa5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 020cb46..ba8ef12 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Fix connection leak in CqlRecordWriter (CASSANDRA-9576) * Mlockall before opening system sstables & remove boot_without_jna option (CASSANDRA-9573) * Add functions to convert timeuuid to date or time, deprecate dateOf and unixTimestampOf (CASSANDRA-9229) * Make sure we cancel non-compacting sstables from LifecycleTransaction (CASSANDRA-9566) http://git-wip-us.apache.org/repos/asf/cassandra/blob/17d43fa5/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 78b0494..6e8ffd9 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -299,36 +299,6 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf while (true) { // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. - - // attempt to connect to a different endpoint - try - { - InetAddress address = iter.next(); - String host = address.getHostName(); - client = CqlConfigHelper.getOutputCluster(host, conf).connect(); - } - catch (Exception e) - { - //If connection died due to Interrupt, just try connecting to the endpoint again. - //There are too many ways for the Thread.interrupted() state to be cleared, so - //we can't rely on that here. Until the java driver gives us a better way of knowing - //that this exception came from an InterruptedException, this is the best solution. - if (canRetryDriverConnection(e)) - { - iter.previous(); - } - closeInternal(); - - // Most exceptions mean something unexpected went wrong to that endpoint, so - // we should try again to another. Other exceptions (auth or invalid request) are fatal. - if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext()) - { - lastException = new IOException(e); - break outer; - } - continue; - } - try { int i = 0; @@ -342,7 +312,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf } client.execute(boundStatement); i++; - + if (i >= batchThreshold) break; bindVariables = queue.poll(); @@ -359,6 +329,33 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf } } + // attempt to connect to a different endpoint + try + { + InetAddress address = iter.next(); + String host = address.getHostName(); + client = CqlConfigHelper.getOutputCluster(host, conf).connect(); + } + catch (Exception e) + { + //If connection died due to Interrupt, just try connecting to the endpoint again. + //There are too many ways for the Thread.interrupted() state to be cleared, so + //we can't rely on that here. Until the java driver gives us a better way of knowing + //that this exception came from an InterruptedException, this is the best solution. + if (canRetryDriverConnection(e)) + { + iter.previous(); + } + closeInternal(); + + // Most exceptions mean something unexpected went wrong to that endpoint, so + // we should try again to another. Other exceptions (auth or invalid request) are fatal. + if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext()) + { + lastException = new IOException(e); + break outer; + } + } } } // close all our connections once we are done. @@ -409,7 +406,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf { if (client != null) { - client.close();; + client.close(); } }
