3 new revisions:

Revision: 4b0d5e991fc9
Author:   Tomaz Muraus <[email protected]>
Date:     Thu Nov 17 12:23:47 2011
Log:      Apply connection timeout patch (issue #9).
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4b0d5e991fc9

Revision: af37fcbc2ca4
Author:   Tomaz Muraus <[email protected]>
Date:     Thu Nov 17 12:34:29 2011
Log:      Add CHANGES files.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=af37fcbc2ca4

Revision: 9d5c07af9da6
Author:   Tomaz Muraus <[email protected]>
Date:     Thu Nov 17 12:35:50 2011
Log: Apply "Use async for login, learn, use and PooledConnection.execute" p...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9d5c07af9da6

==============================================================================
Revision: 4b0d5e991fc9
Author:   Tomaz Muraus <[email protected]>
Date:     Thu Nov 17 12:23:47 2011
Log:      Apply connection timeout patch (issue #9).

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4b0d5e991fc9

Modified:
 /lib/driver.js
 /test/test_driver.js

=======================================
--- /lib/driver.js      Fri Nov 11 08:57:42 2011
+++ /lib/driver.js      Thu Nov 17 12:23:47 2011
@@ -22,10 +22,12 @@
var logTiming = require('logmagic').local('node-cassandra-client.driver.timing');

 var sys = require('sys');
+var constants = require('constants');
 var Buffer = require('buffer').Buffer;
 var EventEmitter = require('events').EventEmitter;

 var thrift = require('thrift');
+var async = require('async');
 var Cassandra = require('./gen-nodejs/Cassandra');
 var ttypes = require('./gen-nodejs/cassandra_types');

@@ -43,6 +45,8 @@
   message: 'null/undefined query parameter'
 };

+var DEFAULT_CONNECTION_TIMEOUT = 4000;
+
 /** converts object to a string using toString() method if it exists. */
 function stringify(x) {
   if (x.toString) {
@@ -184,12 +188,14 @@
  * @param config an object used to control the creation of new instances.
  */
 PooledConnection = module.exports.PooledConnection = function(config) {
+  config = config || {};
   this.nodes = [];
   this.holdFor = 10000;
   this.current_node = 0;
   this.use_bigints = config.use_bigints ? true : false;
+  this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
   this.log_time = config.log_time || false;
-
+
   // Construct a list of nodes from hosts in <host>:<port> form
   for (var i = 0; i < config.hosts.length; i++) {
     hostSpec = config.hosts[i];
@@ -239,6 +245,7 @@
                                    user: config.user,
                                    pass: config.pass,
                                    use_bigints: self.use_bigints,
+                                   timeout: self.timeout,
                                    log_time: self.log_time});

              conn.connect(function(err) {
@@ -329,14 +336,16 @@

 /**
  * @param options: valid parts are:
- *  user, pass, host, port, keyspace, use_bigints, log_time
+ *  user, pass, host, port, keyspace, use_bigints, timeout, log_time
  */
 Connection = module.exports.Connection = function(options) {
+  options = options || {};
   log.info('connecting ' + options.host + ':' + options.port);
   this.validators = {};
   this.con = thrift.createConnection(options.host, options.port);
   this.client = null;
   this.connectionInfo = options;
+  this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
 };


@@ -345,15 +354,23 @@
* @param callback called when connection is successful or ultimately fails (err will be present).
  */
 Connection.prototype.connect = function(callback) {
-  var self = this;
+  var self = this,
+      timeoutId;
+
   this.con.on('error', function(err) {
+    clearTimeout(timeoutId);
     amendError(err);
     callback(err);
   });
+
   this.con.on('close', function() {
+    clearTimeout(timeoutId);
log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + ' is closed');
   });
+
   this.con.on('connect', function() {
+    clearTimeout(timeoutId);
+
     // preparing the conneciton is a 3-step process.

     // 1) login
@@ -402,7 +419,7 @@
         cb(err);
       });
     };
-
+
     // put it all together, checking for errors along the way.
     login(function(loginErr) {
       if (loginErr) {
@@ -427,11 +444,23 @@
         });
       }
     });
-
   });
-
+
+  function connectTimeout() {
+    var err = new Error('ETIMEDOUT, Operation timed out');
+    err.errno = constants.ETIMEDOUT;
+
+    try {
+      self.con.connection.destroy(err);
+    }
+    catch (e) {}
+
+    self.con = null;
+  }
+
   // kicks off the connection process.
   this.client = thrift.createClient(Cassandra, this.con);
+  timeoutId = setTimeout(connectTimeout, this.timeout);
 };

 Connection.prototype.close = function() {
=======================================
--- /test/test_driver.js        Fri Nov 11 08:48:27 2011
+++ /test/test_driver.js        Thu Nov 17 12:23:47 2011
@@ -769,13 +769,38 @@
 //  });
 //};

+
+exports.testPooledConnectionFailover = function(test, assert) {
+  var server = null;
+ var hosts = ['google.com:8000', '127.0.0.1:6567', '127.0.0.2', '127.0.0.1:19170']; + var conn = new PooledConnection({'hosts': hosts, 'keyspace': 'Keyspace1', use_bigints: true, 'timeout': 5000});
+
+  async.series([
+    function executeQueries(callback) {
+      conn.execute('UPDATE CfUgly SET A=1 WHERE KEY=1', [], function(err) {
+        assert.ifError(err);
+        callback();
+      });
+    }
+  ],
+
+  function(err) {
+    if (server) {
+      server.close();
+    }
+
+    conn.shutdown();
+    test.finish();
+  });
+};
+
 exports.testPooledConnection = function(test, assert) {
   function bail(conn, err) {
     conn.shutdown();
     assert.ifError(err);
     test.finish();
   }
-
+
   //var hosts = ["127.0.0.2:9170", "127.0.0.1:9170"];
   var hosts = ["127.0.0.1:19170"];
var conn = new PooledConnection({'hosts': hosts, 'keyspace': 'Keyspace1', use_bigints: true});

==============================================================================
Revision: af37fcbc2ca4
Author:   Tomaz Muraus <[email protected]>
Date:     Thu Nov 17 12:34:29 2011
Log:      Add CHANGES files.

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=af37fcbc2ca4

Added:
 /CHANGES

=======================================
--- /dev/null
+++ /CHANGES    Thu Nov 17 12:34:29 2011
@@ -0,0 +1,5 @@
+Changes with cassandra-client in development:
+
+- Set a timeout for connecting to the cassandra server. Default timeout
+  is 4000 ms, but a user can specify custom one by passing 'timeout'
+  option to the PooledConnection / Connection constructor.

==============================================================================
Revision: 9d5c07af9da6
Author:   Tomaz Muraus <[email protected]>
Date:     Thu Nov 17 12:35:50 2011
Log: Apply "Use async for login, learn, use and PooledConnection.execute" patch
(issue #10).

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=9d5c07af9da6

Modified:
 /lib/driver.js

=======================================
--- /lib/driver.js      Fri Nov 11 13:07:28 2011
+++ /lib/driver.js      Thu Nov 17 12:35:50 2011
@@ -22,6 +22,7 @@
var logTiming = require('logmagic').local('node-cassandra-client.driver.timing');

 var sys = require('sys');
+var constants = require('constants');
 var Buffer = require('buffer').Buffer;
 var EventEmitter = require('events').EventEmitter;

@@ -44,6 +45,8 @@
   message: 'null/undefined query parameter'
 };

+var DEFAULT_CONNECTION_TIMEOUT = 4000;
+
 /** converts object to a string using toString() method if it exists. */
 function stringify(x) {
   if (x.toString) {
@@ -185,12 +188,14 @@
  * @param config an object used to control the creation of new instances.
  */
 PooledConnection = module.exports.PooledConnection = function(config) {
+  config = config || {};
   this.nodes = [];
   this.holdFor = 10000;
   this.current_node = 0;
   this.use_bigints = config.use_bigints ? true : false;
+  this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
   this.log_time = config.log_time || false;
-
+
   // Construct a list of nodes from hosts in <host>:<port> form
   for (var i = 0; i < config.hosts.length; i++) {
     hostSpec = config.hosts[i];
@@ -240,6 +245,7 @@
                                    user: config.user,
                                    pass: config.pass,
                                    use_bigints: self.use_bigints,
+                                   timeout: self.timeout,
                                    log_time: self.log_time});

              conn.connect(function(err) {
@@ -342,14 +348,16 @@

 /**
  * @param options: valid parts are:
- *  user, pass, host, port, keyspace, use_bigints, log_time
+ *  user, pass, host, port, keyspace, use_bigints, timeout, log_time
  */
 Connection = module.exports.Connection = function(options) {
+  options = options || {};
   log.info('connecting ' + options.host + ':' + options.port);
   this.validators = {};
   this.con = thrift.createConnection(options.host, options.port);
   this.client = null;
   this.connectionInfo = options;
+  this.timeout = options.timeout || DEFAULT_CONNECTION_TIMEOUT;
 };


@@ -358,15 +366,23 @@
* @param callback called when connection is successful or ultimately fails (err will be present).
  */
 Connection.prototype.connect = function(callback) {
-  var self = this;
+  var self = this,
+      timeoutId;
+
   this.con.on('error', function(err) {
+    clearTimeout(timeoutId);
     amendError(err);
     callback(err);
   });
+
   this.con.on('close', function() {
+    clearTimeout(timeoutId);
log.info(self.connectionInfo.host + ':' + self.connectionInfo.port + ' is closed');
   });
+
   this.con.on('connect', function() {
+    clearTimeout(timeoutId);
+
     // preparing the conneciton is a 3-step process.

     // 1) login
@@ -429,11 +445,23 @@

       callback(err);
     });
-
   });
-
+
+  function connectTimeout() {
+    var err = new Error('ETIMEDOUT, Operation timed out');
+    err.errno = constants.ETIMEDOUT;
+
+    try {
+      self.con.connection.destroy(err);
+    }
+    catch (e) {}
+
+    self.con = null;
+  }
+
   // kicks off the connection process.
   this.client = thrift.createClient(Cassandra, this.con);
+  timeoutId = setTimeout(connectTimeout, this.timeout);
 };

 Connection.prototype.close = function() {

Reply via email to