8 new revisions:

Revision: d5c7427b3cc8
Author:   Christoph Tavan <[email protected]>
Date:     Tue Jan 24 09:50:16 2012
Log:      Perform clean shutdown when PooledConnection.shutdown() is called
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=d5c7427b3cc8

Revision: 634d7d6bd642
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 07:49:43 2012
Log: Add load test for connection pool and fix issue, where connections wer...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=634d7d6bd642

Revision: 4c54109897a6
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 14:24:50 2012
Log: Emit drain event after the last execute() callback of a pool has been ...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4c54109897a6

Revision: 422553851a5f
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 14:25:26 2012
Log: Simplify testPooledConnectionLoad and add testPooledConnectionShutdown...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=422553851a5f

Revision: 2184501cc648
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 14:27:35 2012
Log: Cleanup test_driver.js (remove some console.log and trailing whitespac...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=2184501cc648

Revision: 1a6bfbb5136b
Author:   Christoph Tavan <[email protected]>
Date:     Mon Jan 30 13:11:01 2012
Log: Listen for the drain event only once for shutdown, also reset shutting...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1a6bfbb5136b

Revision: db75d6b18bb5
Author:   gdusbabek <[email protected]>
Date:     Mon Jan 30 14:58:56 2012
Log: * PooledConnection.connect() establishes connections prior to execute(...
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=db75d6b18bb5

Revision: 00b61ccdde0c
Author:   gdusbabek <[email protected]>
Date:     Mon Jan 30 14:59:17 2012
Log:      update CHANGES.
http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=00b61ccdde0c

==============================================================================
Revision: d5c7427b3cc8
Author:   Christoph Tavan <[email protected]>
Date:     Tue Jan 24 09:50:16 2012
Log:      Perform clean shutdown when PooledConnection.shutdown() is called

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=d5c7427b3cc8

Modified:
 /lib/driver.js

=======================================
--- /lib/driver.js      Mon Jan 30 10:26:55 2012
+++ /lib/driver.js      Tue Jan 24 09:50:16 2012
@@ -331,10 +331,33 @@
   timeoutId = setTimeout(connectTimeout, this.timeout);
 };

-Connection.prototype.close = function() {
-  this.con.end();
-  this.con = null;
-  this.client = null;
+/**
+ * Closes the current connection
+ *
+ * `this.con` is a socket connection. For failed socket connections
+ * `this.con.end()` may not trigger a `close` event. So in the cases where we
+ * are experiencing problems with the connection, we can just `end()` it
+ * without waiting for the `close` event.
+ *
+ * Note that the callback is only called, if the `close` event is fired. Also
+ * we only wait for the `close` event if a callback is given.
+ *
+ * @param {function} callback
+ */
+Connection.prototype.close = function(callback) {
+  var self = this;
+  if (!callback) {
+    self.con.end();
+    self.con = null;
+    self.client = null;
+    return;
+  }
+  self.con.on('close', function(err) {
+    self.con = null;
+    self.client = null;
+    callback();
+  });
+  self.con.end();
 };

 /**
@@ -493,7 +516,13 @@
   this.use_bigints = config.use_bigints ? true : false;
   this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
   this.log_time = config.log_time || false;
-
+
+  // Number of currently running queries
+  this.running = 0;
+
+  // Shutdown mode
+  this.shuttingDown = false;
+
   // Construct a list of nodes from hosts in <host>:<port> form
   for (var i = 0; i < config.hosts.length; i++) {
     var hostSpec = config.hosts[i];
@@ -562,8 +591,23 @@
  *    UPDATE             : callback(err)
  *    DELETE             : callback(err)
  */
-PooledConnection.prototype.execute = function(query, args, callback) {
+PooledConnection.prototype.execute = function(query, args, executeCallback) {
   var self = this;
+
+  if (self.shuttingDown) {
+ executeCallback(new Error('Unable to execute query, connection pool is shutting down.'));
+    return;
+  }
+
+  self.running++;
+  var callback = function() {
+    self.running--;
+    if (self.running === 0) {
+      self.emit('drain');
+    }
+    executeCallback.apply(self, arguments);
+  };
+
   self._getNextCon(function(err, con) {
     if (err) {
       callback(err, null);
@@ -631,6 +675,7 @@
     } else if (c.unhealthyAt > 0) {
       callback();
     } else if (!c.connected) {
+      c.taken = true;
       c.connect(function(err) {
         if (c.connected) {
           con = c;
@@ -665,14 +710,40 @@
  * @param callback called when the pool is fully shutdown
  */
 PooledConnection.prototype.shutdown = function(callback) {
- // todo: we need to be able to let pending execute()s finish and block executes from happening while shutting down.
-  this.connections.forEach(function(con) {
-    if (con.connected) {
-      con.close();
-    }
+  var self = this;
+
+  // Start shutdown mode, causes no new execute()'s to be accepted
+  if (self.shuttingDown) {
+    return;
+  }
+  self.shuttingDown = true;
+
+  callback = callback || function() {};
+
+  // Close all open connections as soon as the pool has drained
+  self.on('drain', function() {
+    self._closeConnections(callback);
   });
-  if (callback) {
-    callback();
+
+  // If no queries were running, emit the drain event immediately
+  if (self.running === 0) {
+    self.emit('drain');
   }
 };

+/**
+ * Close all connected connections.
+ *
+ * @param {function} closeCallback that is fired once all connections are closed
+ */
+PooledConnection.prototype._closeConnections = function(closeCallback) {
+  async.forEach(this.connections, function(con, cb) {
+    if (con.connected) {
+      con.close(cb);
+    } else {
+      cb(null);
+    }
+  }, function(err) {
+    closeCallback(err);
+  });
+};

==============================================================================
Revision: 634d7d6bd642
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 07:49:43 2012
Log: Add load test for connection pool and fix issue, where connections were marked as 'taken' too early

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=634d7d6bd642

Modified:
 /lib/driver.js
 /test/test_driver.js

=======================================
--- /lib/driver.js      Tue Jan 24 09:50:16 2012
+++ /lib/driver.js      Thu Jan 26 07:49:43 2012
@@ -613,7 +613,6 @@
       callback(err, null);
     } else {
       try {
-        con.taken = true;
         con.execute(query, args, function(err, result) {
           con.taken = false;
           var recoverableError = null;
=======================================
--- /test/test_driver.js        Mon Jan 30 10:26:55 2012
+++ /test/test_driver.js        Thu Jan 26 07:49:43 2012
@@ -1008,3 +1008,46 @@
     }
   });
 };
+
+
+exports.testPooledConnectionLoad = function(test, assert) {
+  var hosts = ['127.0.0.1:19170'];
+ var conn = new PooledConnection({'hosts': hosts, 'keyspace': 'Keyspace1'});
+
+  var count = 3000;
+
+  async.waterfall([
+    function(cb) {
+      conn.execute('TRUNCATE CfUtf8', [], cb);
+    },
+    function(res, cb) {
+      var executes = [];
+      for (var i = 0; i < count; i++) {
+        executes.push(function(parallelCb) {
+          var uuid = new UUID().toString();
+          conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
+            'testCol',
+            'testVal',
+            uuid
+          ], parallelCb);
+        });
+      }
+      async.parallel(executes, function(err) {
+        assert.ifError(err);
+        cb();
+      });
+    },
+    function(cb) {
+      conn.execute('SELECT COUNT(*) FROM CfUtf8', [], cb);
+    },
+    function(res, cb) {
+      assert.equal(res[0].colHash.count, count);
+      cb();
+    },
+    conn.shutdown.bind(conn)
+  ],
+  function(err) {
+    assert.ifError(err);
+    test.finish();
+  });
+};

==============================================================================
Revision: 4c54109897a6
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 14:24:50 2012
Log: Emit drain event after the last execute() callback of a pool has been called

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=4c54109897a6

Modified:
 /lib/driver.js

=======================================
--- /lib/driver.js      Thu Jan 26 07:49:43 2012
+++ /lib/driver.js      Thu Jan 26 14:24:50 2012
@@ -602,10 +602,10 @@
   self.running++;
   var callback = function() {
     self.running--;
+    executeCallback.apply(self, arguments);
     if (self.running === 0) {
       self.emit('drain');
     }
-    executeCallback.apply(self, arguments);
   };

   self._getNextCon(function(err, con) {

==============================================================================
Revision: 422553851a5f
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 14:25:26 2012
Log: Simplify testPooledConnectionLoad and add testPooledConnectionShutdown

- testPooledConnectionLoad now uses indexed keys instead of uuids which
  is more reproducible.
- testPooledConnectionShutdown checks whether all execute() callbacks
  are being fired *before* the callback which is passed to shutdown().

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=422553851a5f

Modified:
 /test/test_driver.js

=======================================
--- /test/test_driver.js        Thu Jan 26 07:49:43 2012
+++ /test/test_driver.js        Thu Jan 26 14:25:26 2012
@@ -1023,14 +1023,15 @@
     function(res, cb) {
       var executes = [];
       for (var i = 0; i < count; i++) {
-        executes.push(function(parallelCb) {
-          var uuid = new UUID().toString();
-          conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
-            'testCol',
-            'testVal',
-            uuid
-          ], parallelCb);
-        });
+        (function(index) {
+          executes.push(function(parallelCb) {
+            conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', [
+              'testCol',
+              'testVal',
+              'testKey'+index
+            ], parallelCb);
+          });
+        })(i);
       }
       async.parallel(executes, function(err) {
         assert.ifError(err);
@@ -1044,10 +1045,41 @@
       assert.equal(res[0].colHash.count, count);
       cb();
     },
-    conn.shutdown.bind(conn)
+    function(cb) {
+      conn.execute('TRUNCATE CfUtf8', [], cb);
+    },
+    function(res, cb) {
+      conn.shutdown(cb);
+    }
   ],
   function(err) {
     assert.ifError(err);
     test.finish();
   });
 };
+
+
+// We want to test if all executes of a pooled connection are finished before
+// the shutdown callback is called.
+exports.testPooledConnectionShutdown = function(test, assert) {
+  var hosts = ['127.0.0.1:19170'];
+ var conn = new PooledConnection({'hosts': hosts, 'keyspace': 'Keyspace1'});
+
+  var expected = 100;
+  var cbcount = 0;
+  var spy = function(err, res) {
+    assert.ifError(err);
+    cbcount++;
+  };
+
+  for (var i = 0; i < expected; i++) {
+    (function(index) {
+ conn.execute('UPDATE CfUtf8 SET ? = ? WHERE KEY = ?', ['col', 'val', 'key'+index], spy);
+    })(i);
+  }
+  conn.shutdown(function(err) {
+    assert.ifError(err);
+    assert.equal(cbcount, expected);
+    test.finish();
+  });
+};

==============================================================================
Revision: 2184501cc648
Author:   Christoph Tavan <[email protected]>
Date:     Thu Jan 26 14:27:35 2012
Log: Cleanup test_driver.js (remove some console.log and trailing whitespaces)

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=2184501cc648

Modified:
 /test/test_driver.js

=======================================
--- /test/test_driver.js        Thu Jan 26 14:25:26 2012
+++ /test/test_driver.js        Thu Jan 26 14:27:35 2012
@@ -215,7 +215,6 @@
     function executeCountQuery(callback) {
       con.execute('SELECT COUNT(*) FROM CfLong', [], function(err, rows) {
         assert.ifError(err);
-        console.log(rows[0].cols);
         assert.strictEqual(rows[0].cols[0].value, 5);
         callback();
       });
@@ -453,7 +452,6 @@
           con.close();
           assert.strictEqual(rows.rowCount(), 1);
           var row = rows[0];
-          console.log(row);
assert.strictEqual(row.key.toString('base64'), binaryParams[2].toString('base64')); assert.strictEqual(row.cols[0].name.toString('base64'), binaryParams[0].toString('base64')); assert.strictEqual(row.cols[0].value.toString('base64'), binaryParams[1].toString('base64'));

==============================================================================
Revision: 1a6bfbb5136b
Author:   Christoph Tavan <[email protected]>
Date:     Mon Jan 30 13:11:01 2012
Log: Listen for the drain event only once for shutdown, also reset shuttingDown state

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=1a6bfbb5136b

Modified:
 /lib/driver.js

=======================================
--- /lib/driver.js      Thu Jan 26 14:24:50 2012
+++ /lib/driver.js      Mon Jan 30 13:11:01 2012
@@ -720,8 +720,11 @@
   callback = callback || function() {};

   // Close all open connections as soon as the pool has drained
-  self.on('drain', function() {
-    self._closeConnections(callback);
+  self.once('drain', function() {
+    self._closeConnections(function() {
+      self.shuttingDown = false;
+      callback();
+    });
   });

   // If no queries were running, emit the drain event immediately

==============================================================================
Revision: db75d6b18bb5
Author:   gdusbabek <[email protected]>
Date:     Mon Jan 30 14:58:56 2012
Log: * PooledConnection.connect() establishes connections prior to execute()ing.
* Increase test timeout to 30s.
* Node version >= 0.6.7
* Increate connection setup timeout to 10s.

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=db75d6b18bb5

Modified:
 /lib/driver.js
 /package.json
 /test/test_driver.js

=======================================
--- /lib/driver.js      Mon Jan 30 13:11:01 2012
+++ /lib/driver.js      Mon Jan 30 14:58:56 2012
@@ -250,7 +250,7 @@
       }
     };

-    // 2) login.
+    // 2) learn.
     var learn = function(cb) {
       var timeoutId = setTimeout(function() {
         if (timeoutId) {
@@ -579,6 +579,33 @@
return incrCount >= this.connections.length && !this.connections[this.current_node].isHealthy();
 };

+
+/**
+ * Establishes connections to all hosts in the pool.
+ * @return callback expects err (if all the connect()s failed.
+ */
+PooledConnection.prototype.connect = function(callback) {
+  var self = this;
+  var errors = [];
+  async.forEach(self.connections, function doConnect(con, callback) {
+    con.connect(function(err) {
+      if (err) {
+        errors.push(err);
+      }
+      callback();
+    });
+  }, function() {
+    if (errors.length === self.connections.length) {
+ var error = new Error('There were errors connecting to every connection');
+      error._individualErrors = errors;
+      callback(error);
+    } else {
+      callback();
+    }
+  });
+};
+
+
 /**
  * executes any query
  * @param query any CQL statement with '?' placeholders.
@@ -652,12 +679,14 @@
   var tryStart = new Date().getTime();
   var con = null;
   var allBad = false;
+  var timedOut = false;
   var takens = [];
   async.whilst(function truthTest() {
// should the timeout of getting a single connection be the sum of all connections? Think of a scenario where the // timeout is N, but the first X nodes are unresponsive. You still want to allow access to the subsequent good
     // nodes.
- return !allBad && con === null && (new Date().getTime() - tryStart) < (self.timeout * self.connections.length); + timedOut = (new Date().getTime() - tryStart) >= (self.timeout * self.connections.length)
+    return !allBad && con === null && !timedOut;
   }, function tryConnect(callback) {
     var c = self.connections[self.current_node];
     allBad = self._incr();
@@ -693,7 +722,9 @@
       callback();
     }
   }, function whenDone(err) {
-    if (allBad && !err) {
+    if (timedOut && !err) {
+ err = new Error('Timed out waiting for connection ' + (self.timeout * self.connections.length));
+    } else if (allBad && !err) {
       err = new Error('All connections are unhealthy.');
     } else if (!con && !err) {
       err = new Error('connection was not set');
=======================================
--- /package.json       Thu Jan 26 14:40:08 2012
+++ /package.json       Mon Jan 30 14:58:56 2012
@@ -20,10 +20,10 @@
     "lib": "lib"
   },
   "scripts": {
- "test": "whiskey --tests \"test/test_driver.js test/test_decoder.js test/test_uuid.js\" --dependencies test/dependencies.json --scope-leaks" + "test": "whiskey --tests \"test/test_driver.js test/test_decoder.js test/test_uuid.js\" --dependencies test/dependencies.json --scope-leaks --timeout 30000"
   },
   "engines": {
-    "node": ">= 0.4.0"
+    "node": ">= 0.6.7"
   },
   "dependencies": {
     "async": ">= 0.1.12",
=======================================
--- /test/test_driver.js        Thu Jan 26 14:27:35 2012
+++ /test/test_driver.js        Mon Jan 30 14:58:56 2012
@@ -1010,11 +1010,13 @@

 exports.testPooledConnectionLoad = function(test, assert) {
   var hosts = ['127.0.0.1:19170'];
- var conn = new PooledConnection({'hosts': hosts, 'keyspace': 'Keyspace1'}); + var conn = new PooledConnection({'hosts': hosts, 'keyspace': 'Keyspace1', 'timeout': 10000});

   var count = 3000;

   async.waterfall([
+    // establish connections prior to executing statements.
+    //conn.connect.bind(conn),
     function(cb) {
       conn.execute('TRUNCATE CfUtf8', [], cb);
     },

==============================================================================
Revision: 00b61ccdde0c
Author:   gdusbabek <[email protected]>
Date:     Mon Jan 30 14:59:17 2012
Log:      update CHANGES.

http://code.google.com/a/apache-extras.org/p/cassandra-node/source/detail?r=00b61ccdde0c

Modified:
 /CHANGES

=======================================
--- /CHANGES    Mon Jan 30 11:08:11 2012
+++ /CHANGES    Mon Jan 30 14:59:17 2012
@@ -1,5 +1,7 @@
 Changes with cassandra-client X.Y.Z

+- Put drain back. (http://code.google.com/a/apache-extras.org/p/cassandra-node/issues/detail?id=32)
+
- Allow blob updates (http://code.google.com/a/apache-extras.org/p/cassandra-node/issues/detail?id=28)

- Remove logmagic dependency, see README.md for how to capture logging events.

Reply via email to