2 new revisions:

Revision: 1f8b2e588033
Author:   gdusbabek <[email protected]>
Date:     Fri Feb  3 07:43:44 2012
Log:      Bug fixes and enhancements....
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1f8b2e588033

Revision: 1b12e4f515ab
Author:   gdusbabek <[email protected]>
Date:     Fri Feb  3 07:44:05 2012
Log:      update CHANGES and package.json
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1b12e4f515ab

==============================================================================
Revision: 1f8b2e588033
Author:   gdusbabek <[email protected]>
Date:     Fri Feb  3 07:43:44 2012
Log:      Bug fixes and enhancements.

* id connections.
* introduce options.max_query to allow separate and configurable query timeouts.
* emit error event on thrift error.
* long query watch logs a trace when any query takes more than 10s.
* pooled connections need to quietly tear down transport connections when being reconnected.
* fix recoverable error logic in PooledConnection.execute.
* Make PooledConnection.execute() re-execution calls explicit (saw some strange inheritance problems when execute() was overloaded with different parameters).

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1f8b2e588033

Modified:
 /lib/driver.js

=======================================
--- /lib/driver.js      Mon Jan 30 13:54:09 2012
+++ /lib/driver.js      Fri Feb  3 07:43:44 2012
@@ -42,8 +42,12 @@
   message: 'null/undefined query parameter'
 };

+//  how long we wait for a connection to time out.
 var DEFAULT_CONNECTION_TIMEOUT = 4000;

+// how long we wait for a query to finish.
+var DEFAULT_MAX_QUERY = 100000;
+
/** Default timeout for each of the steps (login, learn, use) which are performed
 * when the Connection to the Cassandra server has been established. */
 var DEFAULT_STEP_TIMEOUTS = {
@@ -178,6 +182,7 @@
   return this._colCount;
 };

+var CON_COUNTER = 0;

 /**
  * @param options: valid parts are:
@@ -189,6 +194,8 @@
   this.client = null;
   this.connectionInfo = options;
   this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
+  this.max_query = options.max_query || DEFAULT_MAX_QUERY;
+  this._id = ++CON_COUNTER;

   EventEmitter.call(this);
 };
@@ -203,19 +210,20 @@
   var self = this,
       timeoutId;

- self.emit('log', 'info', 'connecting ' + self.connectionInfo.host + ':' + self.connectionInfo.port); + self.emit('log', 'info', 'connecting ' + self.connectionInfo.host + ':' + self.connectionInfo.port + '(' + this._id + ')');

// build connection here, so that timeouts on bad hosts happen now and not in the constructor. this.con = thrift.createConnection(self.connectionInfo.host, self.connectionInfo.port);
   this.con.on('error', function(err) {
     clearTimeout(timeoutId);
+ self.emit('log', 'error', self.connectionInfo.host + ':' + self.connectionInfo.port + ' error during connect.');
     amendError(err, self.connectionInfo);
     callback(err);
   });

   this.con.on('close', function() {
     clearTimeout(timeoutId);
- self.emit('log', 'info', self.connectionInfo.host + ':' + self.connectionInfo.port + ' is closed'); + self.emit('log', 'info', self.connectionInfo.host + ':' + self.connectionInfo.port + ' is closed (' + self._id + ')');
   });

   this.con.on('connect', function() {
@@ -385,11 +393,15 @@
self.emit('log', 'cql', {'query': query, 'parameterized_query': cqlString, 'args': args});

// if a connection dies at the right place, execute_cql_query never returns. make sure the callback gets called.
+    var longQueryWatch = setTimeout(function() {
+      self.emit('log', 'trace', "query is taking long: " + cql);
+    },10000);
     var timeoutId = setTimeout(function() {
       callback(new Error('Connection timed out'));
       timeoutId = null;
- }, this.timeout); // todo: should we disambiguate connection timeout vs query timeout?
+    }, self.max_query);
self.client.execute_cql_query(cql, ttypes.Compression.NONE, function(err, res) {
+      clearTimeout(longQueryWatch);
       if (!timeoutId) {
         self.emit('log', 'warn', 'query returned after timeout: ' + cql);
         return;
@@ -462,6 +474,19 @@
  */
 ConnectionInPool.prototype.connect = function(callback) {
   var self = this;
+
+ // pooled connections can be reestablished. track this and make sure the previous transport is torn down. + // it occupies a fair amount of resources. the callback doesn't need to depend on this since the new transport
+  // can be repointed before the old transport goes away.
+  if (self._id !== undefined && self.con && self.client) {
+    self.emit('log', 'debug', 'initiating proactive transport close');
+    // ghetto close.
+    var _con = self.con;
+    _con.on('close', function(err) {
+      self.emit('log', 'debug', 'proactive transport close finished');
+    });
+    _con.end();
+  }
   Connection.call(this, this._options);
   Connection.prototype.connect.call(this, function(err) {
     self.connected = !err;
@@ -627,9 +652,9 @@
   }

   self.running++;
-  var callback = function() {
+  var callback = function(err, results) {
     self.running--;
-    executeCallback.apply(self, arguments);
+    executeCallback(err, results);
     if (self.running === 0) {
       self.emit('drain');
     }
@@ -642,20 +667,15 @@
       try {
         con.execute(query, args, function(err, result) {
           con.taken = false;
-          var recoverableError = null;
           if (err) {
if (err.hasOwnProperty('name') && contains(appExceptions, err.name)) {
               callback(err, null);
-              return;
             } else {
-              recoverableError = err;
-            }
-            if (recoverableError) {
               con.unhealthyAt = new Date().getTime();
               con.taken = false;
- self.emit('log', 'warn', 'setting unhealthy from execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port); + self.emit('log', 'warn', 'setting unhealthy from execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port + ',' + err.message);
               // try again.
-              self.execute(query, args, callback);
+ PooledConnection.prototype.execute.call(self, query, args, callback);
             }
           } else {
             callback(null, result);
@@ -667,7 +687,7 @@
         con.taken = false;
self.emit('log', 'warn', 'setting unhealthy from catch outside execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port);
         // try again.
-        self.execute(query, args, callback);
+ PooledConnection.prototype.execute.call(self, query, args, callback);
       }
     }
   });
@@ -681,6 +701,7 @@
   var allBad = false;
   var timedOut = false;
   var takens = [];
+  var connectionInfo;
   async.whilst(function truthTest() {
// should the timeout of getting a single connection be the sum of all connections? Think of a scenario where the // timeout is N, but the first X nodes are unresponsive. You still want to allow access to the subsequent good
@@ -689,6 +710,7 @@
     return !allBad && con === null && !timedOut;
   }, function tryConnect(callback) {
     var c = self.connections[self.current_node];
+    connectionInfo = c.connectionInfo;
     allBad = self._incr();
     if (c.taken) {
takens[self.current_node] = takens[self.current_node] === undefined ? 1 : takens[self.current_node] + 1;
@@ -729,6 +751,7 @@
     } else if (!con && !err) {
       err = new Error('connection was not set');
     }
+    err = err ? amendError(err, connectionInfo) : err;
     callback(err, con);
   });
 };

==============================================================================
Revision: 1b12e4f515ab
Author:   gdusbabek <[email protected]>
Date:     Fri Feb  3 07:44:05 2012
Log:      update CHANGES and package.json

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1b12e4f515ab

Modified:
 /CHANGES
 /package.json

=======================================
--- /CHANGES    Tue Jan 31 09:32:41 2012
+++ /CHANGES    Fri Feb  3 07:44:05 2012
@@ -1,3 +1,14 @@
+Changes with cassandra-client 0.8.2
+
+- id connections.
+- introduce options.max_query to allow separate and configurable query timeouts.
+- emit error event on thrift error.
+- long query watch logs a trace when any query takes more than 10s.
+- pooled connections need to quietly tear down transport connections when being reconnected.
+- fix recoverable error logic in PooledConnection.execute.
+- Make PooledConnection.execute() re-execution calls explicit (saw some strange inheritance problems when execute()
+  was overloaded with different parameters).
+
 Changes with cassandra-client 0.8.1

 - fix callback problem with CP.shutdown
=======================================
--- /package.json       Tue Jan 31 09:32:41 2012
+++ /package.json       Fri Feb  3 07:44:05 2012
@@ -9,7 +9,7 @@
   ],
   "name": "cassandra-client",
   "description": "Node.js CQL driver for Apache Cassandra",
-  "version": "0.8.1",
+  "version": "0.8.2",
   "homepage": "http://code.google.com/a/apache-extras.org/p/cassandra-node/";,
   "repository": {
     "type": "git",

Reply via email to