2 new revisions:
Revision: 1f8b2e588033
Author: gdusbabek <[email protected]>
Date: Fri Feb 3 07:43:44 2012
Log: Bug fixes and enhancements....
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1f8b2e588033
Revision: 1b12e4f515ab
Author: gdusbabek <[email protected]>
Date: Fri Feb 3 07:44:05 2012
Log: update CHANGES and package.json
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1b12e4f515ab
==============================================================================
Revision: 1f8b2e588033
Author: gdusbabek <[email protected]>
Date: Fri Feb 3 07:43:44 2012
Log: Bug fixes and enhancements.
* id connections.
* introduce options.max_query to allow separate and configurable query
timeouts.
* emit error event on thrift error.
* long query watch logs a trace when any query takes more than 10s.
* pooled connections need to quietly tear down transport connections when
being reconnected.
* fix recoverable error logic in PooledConnection.execute.
* Make PooledConnection.execute() re-execution calls explicit (saw some
strange inheritance problems when execute() was overloaded with different
parameters).
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1f8b2e588033
Modified:
/lib/driver.js
=======================================
--- /lib/driver.js Mon Jan 30 13:54:09 2012
+++ /lib/driver.js Fri Feb 3 07:43:44 2012
@@ -42,8 +42,12 @@
message: 'null/undefined query parameter'
};
+// how long we wait for a connection to time out.
var DEFAULT_CONNECTION_TIMEOUT = 4000;
+// how long we wait for a query to finish.
+var DEFAULT_MAX_QUERY = 100000;
+
/** Default timeout for each of the steps (login, learn, use) which are
performed
* when the Connection to the Cassandra server has been established. */
var DEFAULT_STEP_TIMEOUTS = {
@@ -178,6 +182,7 @@
return this._colCount;
};
+var CON_COUNTER = 0;
/**
* @param options: valid parts are:
@@ -189,6 +194,8 @@
this.client = null;
this.connectionInfo = options;
this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
+ this.max_query = options.max_query || DEFAULT_MAX_QUERY;
+ this._id = ++CON_COUNTER;
EventEmitter.call(this);
};
@@ -203,19 +210,20 @@
var self = this,
timeoutId;
- self.emit('log', 'info', 'connecting ' + self.connectionInfo.host + ':'
+ self.connectionInfo.port);
+ self.emit('log', 'info', 'connecting ' + self.connectionInfo.host + ':'
+ self.connectionInfo.port + '(' + this._id + ')');
// build connection here, so that timeouts on bad hosts happen now and
not in the constructor.
this.con = thrift.createConnection(self.connectionInfo.host,
self.connectionInfo.port);
this.con.on('error', function(err) {
clearTimeout(timeoutId);
+ self.emit('log', 'error', self.connectionInfo.host + ':' +
self.connectionInfo.port + ' error during connect.');
amendError(err, self.connectionInfo);
callback(err);
});
this.con.on('close', function() {
clearTimeout(timeoutId);
- self.emit('log', 'info', self.connectionInfo.host + ':' +
self.connectionInfo.port + ' is closed');
+ self.emit('log', 'info', self.connectionInfo.host + ':' +
self.connectionInfo.port + ' is closed (' + self._id + ')');
});
this.con.on('connect', function() {
@@ -385,11 +393,15 @@
self.emit('log', 'cql', {'query': query, 'parameterized_query':
cqlString, 'args': args});
// if a connection dies at the right place, execute_cql_query never
returns. make sure the callback gets called.
+ var longQueryWatch = setTimeout(function() {
+ self.emit('log', 'trace', "query is taking long: " + cql);
+ },10000);
var timeoutId = setTimeout(function() {
callback(new Error('Connection timed out'));
timeoutId = null;
- }, this.timeout); // todo: should we disambiguate connection timeout
vs query timeout?
+ }, self.max_query);
self.client.execute_cql_query(cql, ttypes.Compression.NONE,
function(err, res) {
+ clearTimeout(longQueryWatch);
if (!timeoutId) {
self.emit('log', 'warn', 'query returned after timeout: ' + cql);
return;
@@ -462,6 +474,19 @@
*/
ConnectionInPool.prototype.connect = function(callback) {
var self = this;
+
+ // pooled connections can be reestablished. track this and make sure the
previous transport is torn down.
+ // it occupies a fair amount of resources. the callback doesn't need to
depend on this since the new transport
+ // can be repointed before the old transport goes away.
+ if (self._id !== undefined && self.con && self.client) {
+ self.emit('log', 'debug', 'initiating proactive transport close');
+ // ghetto close.
+ var _con = self.con;
+ _con.on('close', function(err) {
+ self.emit('log', 'debug', 'proactive transport close finished');
+ });
+ _con.end();
+ }
Connection.call(this, this._options);
Connection.prototype.connect.call(this, function(err) {
self.connected = !err;
@@ -627,9 +652,9 @@
}
self.running++;
- var callback = function() {
+ var callback = function(err, results) {
self.running--;
- executeCallback.apply(self, arguments);
+ executeCallback(err, results);
if (self.running === 0) {
self.emit('drain');
}
@@ -642,20 +667,15 @@
try {
con.execute(query, args, function(err, result) {
con.taken = false;
- var recoverableError = null;
if (err) {
if (err.hasOwnProperty('name') && contains(appExceptions,
err.name)) {
callback(err, null);
- return;
} else {
- recoverableError = err;
- }
- if (recoverableError) {
con.unhealthyAt = new Date().getTime();
con.taken = false;
- self.emit('log', 'warn', 'setting unhealthy from execute ' +
con.connectionInfo.host + ':' + con.connectionInfo.port);
+ self.emit('log', 'warn', 'setting unhealthy from execute ' +
con.connectionInfo.host + ':' + con.connectionInfo.port + ',' +
err.message);
// try again.
- self.execute(query, args, callback);
+ PooledConnection.prototype.execute.call(self, query, args,
callback);
}
} else {
callback(null, result);
@@ -667,7 +687,7 @@
con.taken = false;
self.emit('log', 'warn', 'setting unhealthy from catch outside
execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port);
// try again.
- self.execute(query, args, callback);
+ PooledConnection.prototype.execute.call(self, query, args,
callback);
}
}
});
@@ -681,6 +701,7 @@
var allBad = false;
var timedOut = false;
var takens = [];
+ var connectionInfo;
async.whilst(function truthTest() {
// should the timeout of getting a single connection be the sum of all
connections? Think of a scenario where the
// timeout is N, but the first X nodes are unresponsive. You still
want to allow access to the subsequent good
@@ -689,6 +710,7 @@
return !allBad && con === null && !timedOut;
}, function tryConnect(callback) {
var c = self.connections[self.current_node];
+ connectionInfo = c.connectionInfo;
allBad = self._incr();
if (c.taken) {
takens[self.current_node] = takens[self.current_node] ===
undefined ? 1 : takens[self.current_node] + 1;
@@ -729,6 +751,7 @@
} else if (!con && !err) {
err = new Error('connection was not set');
}
+ err = err ? amendError(err, connectionInfo) : err;
callback(err, con);
});
};
==============================================================================
Revision: 1b12e4f515ab
Author: gdusbabek <[email protected]>
Date: Fri Feb 3 07:44:05 2012
Log: update CHANGES and package.json
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1b12e4f515ab
Modified:
/CHANGES
/package.json
=======================================
--- /CHANGES Tue Jan 31 09:32:41 2012
+++ /CHANGES Fri Feb 3 07:44:05 2012
@@ -1,3 +1,14 @@
+Changes with cassandra-client 0.8.2
+
+- id connections.
+- introduce options.max_query to allow separate and configurable query
timeouts.
+- emit error event on thrift error.
+- long query watch logs a trace when any query takes more than 10s.
+- pooled connections need to quietly tear down transport connections when
being reconnected.
+- fix recoverable error logic in PooledConnection.execute.
+- Make PooledConnection.execute() re-execution calls explicit (saw some
strange inheritance problems when execute()
+ was overloaded with different parameters).
+
Changes with cassandra-client 0.8.1
- fix callback problem with CP.shutdown
=======================================
--- /package.json Tue Jan 31 09:32:41 2012
+++ /package.json Fri Feb 3 07:44:05 2012
@@ -9,7 +9,7 @@
],
"name": "cassandra-client",
"description": "Node.js CQL driver for Apache Cassandra",
- "version": "0.8.1",
+ "version": "0.8.2",
"homepage": "http://code.google.com/a/apache-extras.org/p/cassandra-node/",
"repository": {
"type": "git",