http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1a45507/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection.js ---------------------------------------------------------------------- diff --git a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection.js b/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection.js deleted file mode 100644 index 45f69ad..0000000 --- a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection.js +++ /dev/null @@ -1,541 +0,0 @@ -var utils = require('./connection_utils'), - inherits = require('util').inherits, - net = require('net'), - EventEmitter = require('events').EventEmitter, - inherits = require('util').inherits, - binaryutils = require('../utils'), - tls = require('tls'); - -var Connection = exports.Connection = function(id, socketOptions) { - var self = this; - // Set up event emitter - EventEmitter.call(this); - // Store all socket options - this.socketOptions = socketOptions ? socketOptions : {host:'localhost', port:27017, domainSocket:false}; - // Set keep alive default if not overriden - if(this.socketOptions.keepAlive == null && (process.platform !== "sunos" || process.platform !== "win32")) this.socketOptions.keepAlive = 100; - // Id for the connection - this.id = id; - // State of the connection - this.connected = false; - // Set if this is a domain socket - this.domainSocket = this.socketOptions.domainSocket; - - // Supported min and max wire protocol - this.minWireVersion = 0; - this.maxWireVersion = 2; - - // - // Connection parsing state - // - this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : Connection.DEFAULT_MAX_BSON_SIZE; - this.maxMessageSizeBytes = socketOptions.maxMessageSizeBytes ? socketOptions.maxMessageSizeBytes : Connection.DEFAULT_MAX_MESSAGE_SIZE; - // Contains the current message bytes - this.buffer = null; - // Contains the current message size - this.sizeOfMessage = 0; - // Contains the readIndex for the messaage - this.bytesRead = 0; - // Contains spill over bytes from additional messages - this.stubBuffer = 0; - - // Just keeps list of events we allow - this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[], timeout:[], end:[]}; - - // Just keeps list of events we allow - resetHandlers(this, false); - // Bson object - this.maxBsonSettings = { - disableDriverBSONSizeCheck: this.socketOptions['disableDriverBSONSizeCheck'] || false - , maxBsonSize: this.maxBsonSize - , maxMessageSizeBytes: this.maxMessageSizeBytes - } - - // Allow setting the socketTimeoutMS on all connections - // to work around issues such as secondaries blocking due to compaction - Object.defineProperty(this, "socketTimeoutMS", { - enumerable: true - , get: function () { return self.socketOptions.socketTimeoutMS; } - , set: function (value) { - // Set the socket timeoutMS value - self.socketOptions.socketTimeoutMS = value; - // Set the physical connection timeout - self.connection.setTimeout(self.socketOptions.socketTimeoutMS); - } - }); -} - -// Set max bson size -Connection.DEFAULT_MAX_BSON_SIZE = 1024 * 1024 * 4; -// Set default to max bson to avoid overflow or bad guesses -Connection.DEFAULT_MAX_MESSAGE_SIZE = Connection.DEFAULT_MAX_BSON_SIZE; - -// Inherit event emitter so we can emit stuff wohoo -inherits(Connection, EventEmitter); - -Connection.prototype.start = function() { - var self = this; - - // If we have a normal connection - if(this.socketOptions.ssl) { - // Create new connection instance - if(this.domainSocket) { - this.connection = net.createConnection(this.socketOptions.host); - } else { - this.connection = net.createConnection(this.socketOptions.port, this.socketOptions.host); - } - if(this.logger != null && this.logger.doDebug){ - this.logger.debug("opened connection", this.socketOptions); - } - - // Set options on the socket - this.connection.setTimeout(this.socketOptions.connectTimeoutMS != null ? this.socketOptions.connectTimeoutMS : this.socketOptions.timeout); - // Work around for 0.4.X - if(process.version.indexOf("v0.4") == -1) this.connection.setNoDelay(this.socketOptions.noDelay); - // Set keep alive if defined - if(process.version.indexOf("v0.4") == -1) { - if(this.socketOptions.keepAlive > 0) { - this.connection.setKeepAlive(true, this.socketOptions.keepAlive); - } else { - this.connection.setKeepAlive(false); - } - } - - // Check if the driver should validate the certificate - var validate_certificates = this.socketOptions.sslValidate == true ? true : false; - - // Create options for the tls connection - var tls_options = { - socket: this.connection - , rejectUnauthorized: false - } - - // If we wish to validate the certificate we have provided a ca store - if(validate_certificates) { - tls_options.ca = this.socketOptions.sslCA; - } - - // If we have a certificate to present - if(this.socketOptions.sslCert) { - tls_options.cert = this.socketOptions.sslCert; - tls_options.key = this.socketOptions.sslKey; - } - - // If the driver has been provided a private key password - if(this.socketOptions.sslPass) { - tls_options.passphrase = this.socketOptions.sslPass; - } - - // Contains the cleartext stream - var cleartext = null; - // Attempt to establish a TLS connection to the server - try { - cleartext = tls.connect(this.socketOptions.port, this.socketOptions.host, tls_options, function() { - // If we have a ssl certificate validation error return an error - if(cleartext.authorizationError && validate_certificates) { - // Emit an error - return self.emit("error", cleartext.authorizationError, self, {ssl:true}); - } - - // Connect to the server - connectHandler(self)(); - }) - } catch(err) { - return self.emit("error", "SSL connection failed", self, {ssl:true}); - } - - // Save the output stream - this.writeSteam = cleartext; - - // Set up data handler for the clear stream - cleartext.on("data", createDataHandler(this)); - // Do any handling of end event of the stream - cleartext.on("end", endHandler(this)); - cleartext.on("error", errorHandler(this)); - - // Handle any errors - this.connection.on("error", errorHandler(this)); - // Handle timeout - this.connection.on("timeout", timeoutHandler(this)); - // Handle drain event - this.connection.on("drain", drainHandler(this)); - // Handle the close event - this.connection.on("close", closeHandler(this)); - } else { - // Create new connection instance - if(this.domainSocket) { - this.connection = net.createConnection(this.socketOptions.host); - } else { - this.connection = net.createConnection(this.socketOptions.port, this.socketOptions.host); - } - if(this.logger != null && this.logger.doDebug){ - this.logger.debug("opened connection", this.socketOptions); - } - - // Set options on the socket - this.connection.setTimeout(this.socketOptions.connectTimeoutMS != null ? this.socketOptions.connectTimeoutMS : this.socketOptions.timeout); - // Work around for 0.4.X - if(process.version.indexOf("v0.4") == -1) this.connection.setNoDelay(this.socketOptions.noDelay); - // Set keep alive if defined - if(process.version.indexOf("v0.4") == -1) { - if(this.socketOptions.keepAlive > 0) { - this.connection.setKeepAlive(true, this.socketOptions.keepAlive); - } else { - this.connection.setKeepAlive(false); - } - } - - // Set up write stream - this.writeSteam = this.connection; - // Add handlers - this.connection.on("error", errorHandler(this)); - // Add all handlers to the socket to manage it - this.connection.on("connect", connectHandler(this)); - // this.connection.on("end", endHandler(this)); - this.connection.on("data", createDataHandler(this)); - this.connection.on("timeout", timeoutHandler(this)); - this.connection.on("drain", drainHandler(this)); - this.connection.on("close", closeHandler(this)); - } -} - -// Check if the sockets are live -Connection.prototype.isConnected = function() { - return this.connected && !this.connection.destroyed && this.connection.writable && this.connection.readable; -} - -// Validate if the driver supports this server -Connection.prototype.isCompatible = function() { - if(this.serverCapabilities == null) return true; - // Is compatible with backward server - if(this.serverCapabilities.minWireVersion == 0 - && this.serverCapabilities.maxWireVersion ==0) return true; - - // Check if we overlap - if(this.serverCapabilities.minWireVersion >= this.minWireVersion - && this.serverCapabilities.maxWireVersion <= this.maxWireVersion) return true; - - // Not compatible - return false; -} - -// Write the data out to the socket -Connection.prototype.write = function(command, callback) { - try { - // If we have a list off commands to be executed on the same socket - if(Array.isArray(command)) { - for(var i = 0; i < command.length; i++) { - try { - // Pass in the bson validation settings (validate early) - var binaryCommand = command[i].toBinary(this.maxBsonSettings) - - if(this.logger != null && this.logger.doDebug) - this.logger.debug("writing command to mongodb", {binary: binaryCommand, json: command[i]}); - - this.writeSteam.write(binaryCommand); - } catch(err) { - return callback(err, null); - } - } - } else { - try { - // Pass in the bson validation settings (validate early) - var binaryCommand = command.toBinary(this.maxBsonSettings) - // Do we have a logger active log the event - if(this.logger != null && this.logger.doDebug) - this.logger.debug("writing command to mongodb", {binary: binaryCommand, json: command[i]}); - // Write the binary command out to socket - this.writeSteam.write(binaryCommand); - } catch(err) { - return callback(err, null) - } - } - } catch (err) { - if(typeof callback === 'function') callback(err); - } -} - -// Force the closure of the connection -Connection.prototype.close = function() { - // clear out all the listeners - resetHandlers(this, true); - // Add a dummy error listener to catch any weird last moment errors (and ignore them) - this.connection.on("error", function() {}) - // destroy connection - this.connection.destroy(); - if(this.logger != null && this.logger.doDebug){ - this.logger.debug("closed connection", this.connection); - } -} - -// Reset all handlers -var resetHandlers = function(self, clearListeners) { - self.eventHandlers = {error:[], connect:[], close:[], end:[], timeout:[], parseError:[], message:[]}; - - // If we want to clear all the listeners - if(clearListeners && self.connection != null) { - var keys = Object.keys(self.eventHandlers); - // Remove all listeners - for(var i = 0; i < keys.length; i++) { - self.connection.removeAllListeners(keys[i]); - } - } -} - -// -// Handlers -// - -// Connect handler -var connectHandler = function(self) { - return function(data) { - // Set connected - self.connected = true; - // Now that we are connected set the socket timeout - self.connection.setTimeout(self.socketOptions.socketTimeoutMS != null ? self.socketOptions.socketTimeoutMS : self.socketOptions.timeout); - // Emit the connect event with no error - self.emit("connect", null, self); - } -} - -var createDataHandler = exports.Connection.createDataHandler = function(self) { - // We need to handle the parsing of the data - // and emit the messages when there is a complete one - return function(data) { - // Parse until we are done with the data - while(data.length > 0) { - // If we still have bytes to read on the current message - if(self.bytesRead > 0 && self.sizeOfMessage > 0) { - // Calculate the amount of remaining bytes - var remainingBytesToRead = self.sizeOfMessage - self.bytesRead; - // Check if the current chunk contains the rest of the message - if(remainingBytesToRead > data.length) { - // Copy the new data into the exiting buffer (should have been allocated when we know the message size) - data.copy(self.buffer, self.bytesRead); - // Adjust the number of bytes read so it point to the correct index in the buffer - self.bytesRead = self.bytesRead + data.length; - - // Reset state of buffer - data = new Buffer(0); - } else { - // Copy the missing part of the data into our current buffer - data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead); - // Slice the overflow into a new buffer that we will then re-parse - data = data.slice(remainingBytesToRead); - - // Emit current complete message - try { - var emitBuffer = self.buffer; - // Reset state of buffer - self.buffer = null; - self.sizeOfMessage = 0; - self.bytesRead = 0; - self.stubBuffer = null; - // Emit the buffer - self.emit("message", emitBuffer, self); - } catch(err) { - var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{ - sizeOfMessage:self.sizeOfMessage, - bytesRead:self.bytesRead, - stubBuffer:self.stubBuffer}}; - if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject); - // We got a parse Error fire it off then keep going - self.emit("parseError", errorObject, self); - } - } - } else { - // Stub buffer is kept in case we don't get enough bytes to determine the - // size of the message (< 4 bytes) - if(self.stubBuffer != null && self.stubBuffer.length > 0) { - - // If we have enough bytes to determine the message size let's do it - if(self.stubBuffer.length + data.length > 4) { - // Prepad the data - var newData = new Buffer(self.stubBuffer.length + data.length); - self.stubBuffer.copy(newData, 0); - data.copy(newData, self.stubBuffer.length); - // Reassign for parsing - data = newData; - - // Reset state of buffer - self.buffer = null; - self.sizeOfMessage = 0; - self.bytesRead = 0; - self.stubBuffer = null; - - } else { - - // Add the the bytes to the stub buffer - var newStubBuffer = new Buffer(self.stubBuffer.length + data.length); - // Copy existing stub buffer - self.stubBuffer.copy(newStubBuffer, 0); - // Copy missing part of the data - data.copy(newStubBuffer, self.stubBuffer.length); - // Exit parsing loop - data = new Buffer(0); - } - } else { - if(data.length > 4) { - // Retrieve the message size - var sizeOfMessage = binaryutils.decodeUInt32(data, 0); - // If we have a negative sizeOfMessage emit error and return - if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonSize) { - var errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{ - sizeOfMessage: sizeOfMessage, - bytesRead: self.bytesRead, - stubBuffer: self.stubBuffer}}; - if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject); - // We got a parse Error fire it off then keep going - self.emit("parseError", errorObject, self); - return; - } - - // Ensure that the size of message is larger than 0 and less than the max allowed - if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage > data.length) { - self.buffer = new Buffer(sizeOfMessage); - // Copy all the data into the buffer - data.copy(self.buffer, 0); - // Update bytes read - self.bytesRead = data.length; - // Update sizeOfMessage - self.sizeOfMessage = sizeOfMessage; - // Ensure stub buffer is null - self.stubBuffer = null; - // Exit parsing loop - data = new Buffer(0); - - } else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage == data.length) { - try { - var emitBuffer = data; - // Reset state of buffer - self.buffer = null; - self.sizeOfMessage = 0; - self.bytesRead = 0; - self.stubBuffer = null; - // Exit parsing loop - data = new Buffer(0); - // Emit the message - self.emit("message", emitBuffer, self); - } catch (err) { - var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{ - sizeOfMessage:self.sizeOfMessage, - bytesRead:self.bytesRead, - stubBuffer:self.stubBuffer}}; - if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject); - // We got a parse Error fire it off then keep going - self.emit("parseError", errorObject, self); - } - } else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonSize) { - var errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{ - sizeOfMessage:sizeOfMessage, - bytesRead:0, - buffer:null, - stubBuffer:null}}; - if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject); - // We got a parse Error fire it off then keep going - self.emit("parseError", errorObject, self); - - // Clear out the state of the parser - self.buffer = null; - self.sizeOfMessage = 0; - self.bytesRead = 0; - self.stubBuffer = null; - // Exit parsing loop - data = new Buffer(0); - - } else { - try { - var emitBuffer = data.slice(0, sizeOfMessage); - // Reset state of buffer - self.buffer = null; - self.sizeOfMessage = 0; - self.bytesRead = 0; - self.stubBuffer = null; - // Copy rest of message - data = data.slice(sizeOfMessage); - // Emit the message - self.emit("message", emitBuffer, self); - } catch (err) { - var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{ - sizeOfMessage:sizeOfMessage, - bytesRead:self.bytesRead, - stubBuffer:self.stubBuffer}}; - if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject); - // We got a parse Error fire it off then keep going - self.emit("parseError", errorObject, self); - } - - } - } else { - // Create a buffer that contains the space for the non-complete message - self.stubBuffer = new Buffer(data.length) - // Copy the data to the stub buffer - data.copy(self.stubBuffer, 0); - // Exit parsing loop - data = new Buffer(0); - } - } - } - } - } -} - -var endHandler = function(self) { - return function() { - // Set connected to false - self.connected = false; - // Emit end event - self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); - } -} - -var timeoutHandler = function(self) { - return function() { - // Set connected to false - self.connected = false; - // Emit timeout event - self.emit("timeout", {err: 'connection to [' + self.socketOptions.host + ':' + self.socketOptions.port + '] timed out'}, self); - } -} - -var drainHandler = function(self) { - return function() { - } -} - -var errorHandler = function(self) { - return function(err) { - self.connection.destroy(); - // Set connected to false - self.connected = false; - // Emit error - self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); - } -} - -var closeHandler = function(self) { - return function(hadError) { - // If we have an error during the connection phase - if(hadError && !self.connected) { - // Set disconnected - self.connected = false; - // Emit error - self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); - } else { - // Set disconnected - self.connected = false; - // Emit close - self.emit("close", {err: 'connection closed to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); - } - } -} - -// Some basic defaults -Connection.DEFAULT_PORT = 27017; - - - - - - -
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1a45507/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_pool.js ---------------------------------------------------------------------- diff --git a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_pool.js b/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_pool.js deleted file mode 100644 index 3d9e7c5..0000000 --- a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_pool.js +++ /dev/null @@ -1,295 +0,0 @@ -var utils = require('./connection_utils'), - inherits = require('util').inherits, - net = require('net'), - timers = require('timers'), - EventEmitter = require('events').EventEmitter, - inherits = require('util').inherits, - MongoReply = require("../responses/mongo_reply").MongoReply, - Connection = require("./connection").Connection; - -// Set processor, setImmediate if 0.10 otherwise nextTick -var processor = require('../utils').processor(); - -var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bson, socketOptions) { - if(typeof host !== 'string') { - throw new Error("host must be specified [" + host + "]"); - } - - // Set up event emitter - EventEmitter.call(this); - - // Keep all options for the socket in a specific collection allowing the user to specify the - // Wished upon socket connection parameters - this.socketOptions = typeof socketOptions === 'object' ? socketOptions : {}; - this.socketOptions.host = host; - this.socketOptions.port = port; - this.socketOptions.domainSocket = false; - this.bson = bson; - // PoolSize is always + 1 for special reserved "measurment" socket (like ping, stats etc) - this.poolSize = poolSize; - this.minPoolSize = Math.floor(this.poolSize / 2) + 1; - - // Check if the host is a socket - if(host.match(/^\//)) { - this.socketOptions.domainSocket = true; - } else if(typeof port === 'string') { - try { - port = parseInt(port, 10); - } catch(err) { - new Error("port must be specified or valid integer[" + port + "]"); - } - } else if(typeof port !== 'number') { - throw new Error("port must be specified [" + port + "]"); - } - - // Set default settings for the socket options - utils.setIntegerParameter(this.socketOptions, 'timeout', 0); - // Delay before writing out the data to the server - utils.setBooleanParameter(this.socketOptions, 'noDelay', true); - // Delay before writing out the data to the server - utils.setIntegerParameter(this.socketOptions, 'keepAlive', 0); - // Set the encoding of the data read, default is binary == null - utils.setStringParameter(this.socketOptions, 'encoding', null); - // Allows you to set a throttling bufferSize if you need to stop overflows - utils.setIntegerParameter(this.socketOptions, 'bufferSize', 0); - - // Internal structures - this.openConnections = []; - // Assign connection id's - this.connectionId = 0; - - // Current index for selection of pool connection - this.currentConnectionIndex = 0; - // The pool state - this._poolState = 'disconnected'; - // timeout control - this._timeout = false; - // Time to wait between connections for the pool - this._timeToWait = 10; -} - -inherits(ConnectionPool, EventEmitter); - -ConnectionPool.prototype.setMaxBsonSize = function(maxBsonSize) { - if(maxBsonSize == null){ - maxBsonSize = Connection.DEFAULT_MAX_BSON_SIZE; - } - - for(var i = 0; i < this.openConnections.length; i++) { - this.openConnections[i].maxBsonSize = maxBsonSize; - this.openConnections[i].maxBsonSettings.maxBsonSize = maxBsonSize; - } -} - -ConnectionPool.prototype.setMaxMessageSizeBytes = function(maxMessageSizeBytes) { - if(maxMessageSizeBytes == null){ - maxMessageSizeBytes = Connection.DEFAULT_MAX_MESSAGE_SIZE; - } - - for(var i = 0; i < this.openConnections.length; i++) { - this.openConnections[i].maxMessageSizeBytes = maxMessageSizeBytes; - this.openConnections[i].maxBsonSettings.maxMessageSizeBytes = maxMessageSizeBytes; - } -} - -// Start a function -var _connect = function(_self) { - // return new function() { - // Create a new connection instance - var connection = new Connection(_self.connectionId++, _self.socketOptions); - // Set logger on pool - connection.logger = _self.logger; - // Connect handler - connection.on("connect", function(err, connection) { - // Add connection to list of open connections - _self.openConnections.push(connection); - // If the number of open connections is equal to the poolSize signal ready pool - if(_self.openConnections.length === _self.poolSize && _self._poolState !== 'disconnected') { - // Set connected - _self._poolState = 'connected'; - // Emit pool ready - _self.emit("poolReady"); - } else if(_self.openConnections.length < _self.poolSize) { - // Wait a little bit of time to let the close event happen if the server closes the connection - // so we don't leave hanging connections around - if(typeof _self._timeToWait == 'number') { - setTimeout(function() { - // If we are still connecting (no close events fired in between start another connection) - if(_self._poolState == 'connecting') { - _connect(_self); - } - }, _self._timeToWait); - } else { - processor(function() { - // If we are still connecting (no close events fired in between start another connection) - if(_self._poolState == 'connecting') { - _connect(_self); - } - }); - } - } - }); - - var numberOfErrors = 0 - - // Error handler - connection.on("error", function(err, connection, error_options) { - numberOfErrors++; - // If we are already disconnected ignore the event - if(_self._poolState != 'disconnected' && _self.listeners("error").length > 0) { - _self.emit("error", err, connection, error_options); - } - - // Close the connection - connection.close(); - // Set pool as disconnected - _self._poolState = 'disconnected'; - // Stop the pool - _self.stop(); - }); - - // Close handler - connection.on("close", function() { - // If we are already disconnected ignore the event - if(_self._poolState !== 'disconnected' && _self.listeners("close").length > 0) { - _self.emit("close"); - } - - // Set disconnected - _self._poolState = 'disconnected'; - // Stop - _self.stop(); - }); - - // Timeout handler - connection.on("timeout", function(err, connection) { - // If we are already disconnected ignore the event - if(_self._poolState !== 'disconnected' && _self.listeners("timeout").length > 0) { - _self.emit("timeout", err); - } - - // Close the connection - connection.close(); - // Set disconnected - _self._poolState = 'disconnected'; - _self.stop(); - }); - - // Parse error, needs a complete shutdown of the pool - connection.on("parseError", function() { - // If we are already disconnected ignore the event - if(_self._poolState !== 'disconnected' && _self.listeners("parseError").length > 0) { - _self.emit("parseError", new Error("parseError occured")); - } - - // Set disconnected - _self._poolState = 'disconnected'; - _self.stop(); - }); - - connection.on("message", function(message) { - _self.emit("message", message); - }); - - // Start connection in the next tick - connection.start(); - // }(); -} - - -// Start method, will throw error if no listeners are available -// Pass in an instance of the listener that contains the api for -// finding callbacks for a given message etc. -ConnectionPool.prototype.start = function() { - var markerDate = new Date().getTime(); - var self = this; - - if(this.listeners("poolReady").length == 0) { - throw "pool must have at least one listener ready that responds to the [poolReady] event"; - } - - // Set pool state to connecting - this._poolState = 'connecting'; - this._timeout = false; - - _connect(self); -} - -// Restart a connection pool (on a close the pool might be in a wrong state) -ConnectionPool.prototype.restart = function() { - // Close all connections - this.stop(false); - // Now restart the pool - this.start(); -} - -// Stop the connections in the pool -ConnectionPool.prototype.stop = function(removeListeners) { - removeListeners = removeListeners == null ? true : removeListeners; - // Set disconnected - this._poolState = 'disconnected'; - - // Clear all listeners if specified - if(removeListeners) { - this.removeAllEventListeners(); - } - - // Close all connections - for(var i = 0; i < this.openConnections.length; i++) { - this.openConnections[i].close(); - } - - // Clean up - this.openConnections = []; -} - -// Check the status of the connection -ConnectionPool.prototype.isConnected = function() { - // return this._poolState === 'connected'; - return this.openConnections.length > 0 && this.openConnections[0].isConnected(); -} - -// Checkout a connection from the pool for usage, or grab a specific pool instance -ConnectionPool.prototype.checkoutConnection = function(id) { - var index = (this.currentConnectionIndex++ % (this.openConnections.length)); - var connection = this.openConnections[index]; - return connection; -} - -ConnectionPool.prototype.getAllConnections = function() { - return this.openConnections; -} - -// Remove all non-needed event listeners -ConnectionPool.prototype.removeAllEventListeners = function() { - this.removeAllListeners("close"); - this.removeAllListeners("error"); - this.removeAllListeners("timeout"); - this.removeAllListeners("connect"); - this.removeAllListeners("end"); - this.removeAllListeners("parseError"); - this.removeAllListeners("message"); - this.removeAllListeners("poolReady"); -} - - - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1a45507/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_utils.js ---------------------------------------------------------------------- diff --git a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_utils.js b/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_utils.js deleted file mode 100644 index 5910924..0000000 --- a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/connection_utils.js +++ /dev/null @@ -1,23 +0,0 @@ -exports.setIntegerParameter = function(object, field, defaultValue) { - if(object[field] == null) { - object[field] = defaultValue; - } else if(typeof object[field] !== "number" && object[field] !== parseInt(object[field], 10)) { - throw "object field [" + field + "] must be a numeric integer value, attempted to set to [" + object[field] + "] type of [" + typeof object[field] + "]"; - } -} - -exports.setBooleanParameter = function(object, field, defaultValue) { - if(object[field] == null) { - object[field] = defaultValue; - } else if(typeof object[field] !== "boolean") { - throw "object field [" + field + "] must be a boolean value, attempted to set to [" + object[field] + "] type of [" + typeof object[field] + "]"; - } -} - -exports.setStringParameter = function(object, field, defaultValue) { - if(object[field] == null) { - object[field] = defaultValue; - } else if(typeof object[field] !== "string") { - throw "object field [" + field + "] must be a string value, attempted to set to [" + object[field] + "] type of [" + typeof object[field] + "]"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1a45507/web/demos/package/node_modules/mongodb/lib/mongodb/connection/mongos.js ---------------------------------------------------------------------- diff --git a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/mongos.js b/web/demos/package/node_modules/mongodb/lib/mongodb/connection/mongos.js deleted file mode 100644 index 01a9c32..0000000 --- a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/mongos.js +++ /dev/null @@ -1,537 +0,0 @@ -var ReadPreference = require('./read_preference').ReadPreference - , Base = require('./base').Base - , Server = require('./server').Server - , format = require('util').format - , timers = require('timers') - , utils = require('../utils') - , inherits = require('util').inherits; - -// Set processor, setImmediate if 0.10 otherwise nextTick -var processor = require('../utils').processor(); - -/** - * Mongos constructor provides a connection to a mongos proxy including failover to additional servers - * - * Options - * - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), connectTimeoutMS:(number), socketTimeoutMS:(number)) - * - **ha** {Boolean, default:true}, turn on high availability, attempts to reconnect to down proxies - * - **haInterval** {Number, default:2000}, time between each replicaset status check. - * - * @class Represents a Mongos connection with failover to backup proxies - * @param {Array} list of mongos server objects - * @param {Object} [options] additional options for the mongos connection - */ -var Mongos = function Mongos(servers, options) { - // Set up basic - if(!(this instanceof Mongos)) - return new Mongos(servers, options); - - // Set up event emitter - Base.call(this); - - // Throw error on wrong setup - if(servers == null || !Array.isArray(servers) || servers.length == 0) - throw new Error("At least one mongos proxy must be in the array"); - - // Ensure we have at least an empty options object - this.options = options == null ? {} : options; - // Set default connection pool options - this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {}; - // Enabled ha - this.haEnabled = this.options['ha'] == null ? true : this.options['ha']; - this._haInProgress = false; - // How often are we checking for new servers in the replicaset - this.mongosStatusCheckInterval = this.options['haInterval'] == null ? 1000 : this.options['haInterval']; - // Save all the server connections - this.servers = servers; - // Servers we need to attempt reconnect with - this.downServers = {}; - // Servers that are up - this.upServers = {}; - // Up servers by ping time - this.upServersByUpTime = {}; - // Emit open setup - this.emitOpen = this.options.emitOpen || true; - // Just contains the current lowest ping time and server - this.lowestPingTimeServer = null; - this.lowestPingTime = 0; - // Connection timeout - this._connectTimeoutMS = this.socketOptions.connectTimeoutMS - ? this.socketOptions.connectTimeoutMS - : 1000; - - // Add options to servers - for(var i = 0; i < this.servers.length; i++) { - var server = this.servers[i]; - server._callBackStore = this._callBackStore; - server.auto_reconnect = false; - // Default empty socket options object - var socketOptions = {host: server.host, port: server.port}; - // If a socket option object exists clone it - if(this.socketOptions != null) { - var keys = Object.keys(this.socketOptions); - for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = this.socketOptions[keys[i]]; - } - - // Set socket options - server.socketOptions = socketOptions; - } - - // Allow setting the socketTimeoutMS on all connections - // to work around issues such as secondaries blocking due to compaction - utils.setSocketTimeoutProperty(this, this.socketOptions); -} - -/** - * @ignore - */ -inherits(Mongos, Base); - -/** - * @ignore - */ -Mongos.prototype.isMongos = function() { - return true; -} - -/** - * @ignore - */ -Mongos.prototype.connect = function(db, options, callback) { - if('function' === typeof options) callback = options, options = {}; - if(options == null) options = {}; - if(!('function' === typeof callback)) callback = null; - var self = this; - - // Keep reference to parent - this.db = db; - // Set server state to connecting - this._serverState = 'connecting'; - // Number of total servers that need to initialized (known servers) - this._numberOfServersLeftToInitialize = this.servers.length; - // Connect handler - var connectHandler = function(_server) { - return function(err, result) { - self._numberOfServersLeftToInitialize = self._numberOfServersLeftToInitialize - 1; - - // Add the server to the list of servers that are up - if(!err) { - self.upServers[format("%s:%s", _server.host, _server.port)] = _server; - } - - // We are done connecting - if(self._numberOfServersLeftToInitialize == 0) { - // Start ha function if it exists - if(self.haEnabled) { - // Setup the ha process - if(self._replicasetTimeoutId != null) clearInterval(self._replicasetTimeoutId); - self._replicasetTimeoutId = setInterval(self.mongosCheckFunction, self.mongosStatusCheckInterval); - } - - // Set the mongos to connected - self._serverState = "connected"; - - // Emit the open event - if(self.emitOpen) - self._emitAcrossAllDbInstances(self, null, "open", null, null, null); - - self._emitAcrossAllDbInstances(self, null, "fullsetup", null, null, null); - // Callback - callback(null, self.db); - } - } - }; - - // Error handler - var errorOrCloseHandler = function(_server) { - return function(err, result) { - // Emit left event, signaling mongos left the ha - self.emit('left', 'mongos', _server); - // Execute all the callbacks with errors - self.__executeAllCallbacksWithError(err); - // Check if we have the server - var found = false; - - // Get the server name - var server_name = format("%s:%s", _server.host, _server.port); - // Add the downed server - self.downServers[server_name] = _server; - // Remove the current server from the list - delete self.upServers[server_name]; - - // Emit close across all the attached db instances - if(Object.keys(self.upServers).length == 0) { - self._emitAcrossAllDbInstances(self, null, "close", new Error("mongos disconnected, no valid proxies contactable over tcp"), null, null); - } - } - } - - // Mongo function - this.mongosCheckFunction = function() { - // Set as not waiting for check event - self._haInProgress = true; - - // Servers down - var numberOfServersLeft = Object.keys(self.downServers).length; - - // Check downed servers - if(numberOfServersLeft > 0) { - for(var name in self.downServers) { - // Pop a downed server - var downServer = self.downServers[name]; - // Set up the connection options for a Mongos - var options = { - auto_reconnect: false, - returnIsMasterResults: true, - slaveOk: true, - poolSize: downServer.poolSize, - socketOptions: { - connectTimeoutMS: self._connectTimeoutMS, - socketTimeoutMS: self._socketTimeoutMS - } - } - - // Create a new server object - var newServer = new Server(downServer.host, downServer.port, options); - // Setup the connection function - var connectFunction = function(_db, _server, _options, _callback) { - return function() { - // Attempt to connect - _server.connect(_db, _options, function(err, result) { - numberOfServersLeft = numberOfServersLeft - 1; - - if(err) { - return _callback(err, _server); - } else { - // Set the new server settings - _server._callBackStore = self._callBackStore; - - // Add server event handlers - _server.on("close", errorOrCloseHandler(_server)); - _server.on("timeout", errorOrCloseHandler(_server)); - _server.on("error", errorOrCloseHandler(_server)); - - // Get a read connection - var _connection = _server.checkoutReader(); - // Get the start time - var startTime = new Date().getTime(); - - // Execute ping command to mark each server with the expected times - self.db.command({ping:1} - , {failFast:true, connection:_connection}, function(err, result) { - // Get the start time - var endTime = new Date().getTime(); - // Mark the server with the ping time - _server.runtimeStats['pingMs'] = endTime - startTime; - // Execute any waiting reads - self._commandsStore.execute_writes(); - self._commandsStore.execute_queries(); - // Callback - return _callback(null, _server); - }); - } - }); - } - } - - // Attempt to connect to the database - connectFunction(self.db, newServer, options, function(err, _server) { - // If we have an error - if(err) { - self.downServers[format("%s:%s", _server.host, _server.port)] = _server; - } - - // Connection function - var connectionFunction = function(_auth, _connection, _callback) { - var pending = _auth.length(); - - for(var j = 0; j < pending; j++) { - // Get the auth object - var _auth = _auth.get(j); - // Unpack the parameter - var username = _auth.username; - var password = _auth.password; - var options = { - authMechanism: _auth.authMechanism - , authSource: _auth.authdb - , connection: _connection - }; - - // If we have changed the service name - if(_auth.gssapiServiceName) - options.gssapiServiceName = _auth.gssapiServiceName; - - // Hold any error - var _error = null; - // Authenticate against the credentials - self.db.authenticate(username, password, options, function(err, result) { - _error = err != null ? err : _error; - // Adjust the pending authentication - pending = pending - 1; - // Finished up - if(pending == 0) _callback(_error ? _error : null, _error ? false : true); - }); - } - } - - // Run auths against the connections - if(self.auth.length() > 0) { - var connections = _server.allRawConnections(); - var pendingAuthConn = connections.length; - - // No connections we are done - if(connections.length == 0) { - // Set ha done - if(numberOfServersLeft == 0) { - self._haInProgress = false; - } - } - - // Final error object - var finalError = null; - // Go over all the connections - for(var j = 0; j < connections.length; j++) { - - // Execute against all the connections - connectionFunction(self.auth, connections[j], function(err, result) { - // Pending authentication - pendingAuthConn = pendingAuthConn - 1 ; - - // Save error if any - finalError = err ? err : finalError; - - // If we are done let's finish up - if(pendingAuthConn == 0) { - // Set ha done - if(numberOfServersLeft == 0) { - self._haInProgress = false; - } - - if(!err) { - add_server(self, _server); - } - - // Execute any waiting reads - self._commandsStore.execute_writes(); - self._commandsStore.execute_queries(); - } - }); - } - } else { - if(!err) { - add_server(self, _server); - } - - // Set ha done - if(numberOfServersLeft == 0) { - self._haInProgress = false; - // Execute any waiting reads - self._commandsStore.execute_writes(); - self._commandsStore.execute_queries(); - } - } - })(); - } - } else { - self._haInProgress = false; - } - } - - // Connect all the server instances - for(var i = 0; i < this.servers.length; i++) { - // Get the connection - var server = this.servers[i]; - server.mongosInstance = this; - // Add server event handlers - server.on("close", errorOrCloseHandler(server)); - server.on("timeout", errorOrCloseHandler(server)); - server.on("error", errorOrCloseHandler(server)); - - // Configuration - var options = { - slaveOk: true, - poolSize: server.poolSize, - socketOptions: { connectTimeoutMS: self._connectTimeoutMS }, - returnIsMasterResults: true - } - - // Connect the instance - server.connect(self.db, options, connectHandler(server)); - } -} - -/** - * @ignore - * Add a server to the list of up servers and sort them by ping time - */ -var add_server = function(self, _server) { - // Emit a new server joined - self.emit('joined', "mongos", null, _server); - // Get the server url - var server_key = format("%s:%s", _server.host, _server.port); - // Push to list of valid server - self.upServers[server_key] = _server; - // Remove the server from the list of downed servers - delete self.downServers[server_key]; - - // Sort the keys by ping time - var keys = Object.keys(self.upServers); - var _upServersSorted = {}; - var _upServers = [] - - // Get all the servers - for(var name in self.upServers) { - _upServers.push(self.upServers[name]); - } - - // Sort all the server - _upServers.sort(function(a, b) { - return a.runtimeStats['pingMs'] > b.runtimeStats['pingMs']; - }); - - // Rebuild the upServer - for(var i = 0; i < _upServers.length; i++) { - _upServersSorted[format("%s:%s", _upServers[i].host, _upServers[i].port)] = _upServers[i]; - } - - // Set the up servers - self.upServers = _upServersSorted; -} - -/** - * @ignore - * Just return the currently picked active connection - */ -Mongos.prototype.allServerInstances = function() { - return this.servers; -} - -/** - * Always ourselves - * @ignore - */ -Mongos.prototype.setReadPreference = function() {} - -/** - * @ignore - */ -Mongos.prototype.allRawConnections = function() { - // Neeed to build a complete list of all raw connections, start with master server - var allConnections = []; - // Get all connected connections - for(var name in this.upServers) { - allConnections = allConnections.concat(this.upServers[name].allRawConnections()); - } - // Return all the conections - return allConnections; -} - -/** - * @ignore - */ -Mongos.prototype.isConnected = function() { - return Object.keys(this.upServers).length > 0; -} - -/** - * @ignore - */ -Mongos.prototype.isAutoReconnect = function() { - return true; -} - -/** - * @ignore - */ -Mongos.prototype.canWrite = Mongos.prototype.isConnected; - -/** - * @ignore - */ -Mongos.prototype.canRead = Mongos.prototype.isConnected; - -/** - * @ignore - */ -Mongos.prototype.isDestroyed = function() { - return this._serverState == 'destroyed'; -} - -/** - * @ignore - */ -Mongos.prototype.checkoutWriter = function() { - // Checkout a writer - var keys = Object.keys(this.upServers); - // console.dir("============================ checkoutWriter :: " + keys.length) - if(keys.length == 0) return null; - // console.log("=============== checkoutWriter :: " + this.upServers[keys[0]].checkoutWriter().socketOptions.port) - return this.upServers[keys[0]].checkoutWriter(); -} - -/** - * @ignore - */ -Mongos.prototype.checkoutReader = function(read) { - // console.log("=============== checkoutReader :: read :: " + read); - // If read is set to null default to primary - read = read || 'primary' - // If we have a read preference object unpack it - if(read != null && typeof read == 'object' && read['_type'] == 'ReadPreference') { - // Validate if the object is using a valid mode - if(!read.isValid()) throw new Error("Illegal readPreference mode specified, " + read.mode); - } else if(!ReadPreference.isValid(read)) { - throw new Error("Illegal readPreference mode specified, " + read); - } - - // Checkout a writer - var keys = Object.keys(this.upServers); - if(keys.length == 0) return null; - // console.log("=============== checkoutReader :: " + this.upServers[keys[0]].checkoutWriter().socketOptions.port) - // console.dir(this._commandsStore.commands) - return this.upServers[keys[0]].checkoutWriter(); -} - -/** - * @ignore - */ -Mongos.prototype.close = function(callback) { - var self = this; - // Set server status as disconnected - this._serverState = 'destroyed'; - // Number of connections to close - var numberOfConnectionsToClose = self.servers.length; - // If we have a ha process running kill it - if(self._replicasetTimeoutId != null) clearInterval(self._replicasetTimeoutId); - self._replicasetTimeoutId = null; - - // Emit close event - processor(function() { - self._emitAcrossAllDbInstances(self, null, "close", null, null, true) - }); - - // Flush out any remaining call handlers - self._flushAllCallHandlers(utils.toError("Connection Closed By Application")); - - // Close all the up servers - for(var name in this.upServers) { - this.upServers[name].close(function(err, result) { - numberOfConnectionsToClose = numberOfConnectionsToClose - 1; - - // Callback if we have one defined - if(numberOfConnectionsToClose == 0 && typeof callback == 'function') { - callback(null); - } - }); - } -} - -/** - * @ignore - * Return the used state - */ -Mongos.prototype._isUsed = function() { - return this._used; -} - -exports.Mongos = Mongos; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1a45507/web/demos/package/node_modules/mongodb/lib/mongodb/connection/read_preference.js ---------------------------------------------------------------------- diff --git a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/read_preference.js b/web/demos/package/node_modules/mongodb/lib/mongodb/connection/read_preference.js deleted file mode 100644 index 6845171..0000000 --- a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/read_preference.js +++ /dev/null @@ -1,67 +0,0 @@ -/** - * A class representation of the Read Preference. - * - * Read Preferences - * - **ReadPreference.PRIMARY**, Read from primary only. All operations produce an error (throw an exception where applicable) if primary is unavailable. Cannot be combined with tags (This is the default.). - * - **ReadPreference.PRIMARY_PREFERRED**, Read from primary if available, otherwise a secondary. - * - **ReadPreference.SECONDARY**, Read from secondary if available, otherwise error. - * - **ReadPreference.SECONDARY_PREFERRED**, Read from a secondary if available, otherwise read from the primary. - * - **ReadPreference.NEAREST**, All modes read from among the nearest candidates, but unlike other modes, NEAREST will include both the primary and all secondaries in the random selection. - * - * @class Represents a Read Preference. - * @param {String} the read preference type - * @param {Object} tags - * @return {ReadPreference} - */ -var ReadPreference = function(mode, tags) { - if(!(this instanceof ReadPreference)) - return new ReadPreference(mode, tags); - this._type = 'ReadPreference'; - this.mode = mode; - this.tags = tags; -} - -/** - * @ignore - */ -ReadPreference.isValid = function(_mode) { - return (_mode == ReadPreference.PRIMARY || _mode == ReadPreference.PRIMARY_PREFERRED - || _mode == ReadPreference.SECONDARY || _mode == ReadPreference.SECONDARY_PREFERRED - || _mode == ReadPreference.NEAREST - || _mode == true || _mode == false); -} - -/** - * @ignore - */ -ReadPreference.prototype.isValid = function(mode) { - var _mode = typeof mode == 'string' ? mode : this.mode; - return ReadPreference.isValid(_mode); -} - -/** - * @ignore - */ -ReadPreference.prototype.toObject = function() { - var object = {mode:this.mode}; - - if(this.tags != null) { - object['tags'] = this.tags; - } - - return object; -} - -/** - * @ignore - */ -ReadPreference.PRIMARY = 'primary'; -ReadPreference.PRIMARY_PREFERRED = 'primaryPreferred'; -ReadPreference.SECONDARY = 'secondary'; -ReadPreference.SECONDARY_PREFERRED = 'secondaryPreferred'; -ReadPreference.NEAREST = 'nearest' - -/** - * @ignore - */ -exports.ReadPreference = ReadPreference; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1a45507/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/ha.js ---------------------------------------------------------------------- diff --git a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/ha.js b/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/ha.js deleted file mode 100644 index e3e8f32..0000000 --- a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/ha.js +++ /dev/null @@ -1,414 +0,0 @@ -var DbCommand = require('../../commands/db_command').DbCommand - , format = require('util').format; - -var HighAvailabilityProcess = function(replset, options) { - this.replset = replset; - this.options = options; - this.server = null; - this.state = HighAvailabilityProcess.INIT; - this.selectedIndex = 0; -} - -HighAvailabilityProcess.INIT = 'init'; -HighAvailabilityProcess.RUNNING = 'running'; -HighAvailabilityProcess.STOPPED = 'stopped'; - -HighAvailabilityProcess.prototype.start = function() { - var self = this; - if(this.replset._state - && Object.keys(this.replset._state.addresses).length == 0) { - if(this.server) this.server.close(); - this.state = HighAvailabilityProcess.STOPPED; - return; - } - - if(this.server) this.server.close(); - // Start the running - this._haProcessInProcess = false; - this.state = HighAvailabilityProcess.RUNNING; - - // Get all possible reader servers - var candidate_servers = this.replset._state.getAllReadServers(); - if(candidate_servers.length == 0) { - return; - } - - // Select a candidate server for the connection - var server = candidate_servers[this.selectedIndex % candidate_servers.length]; - this.selectedIndex = this.selectedIndex + 1; - - // Unpack connection options - var connectTimeoutMS = self.options.connectTimeoutMS || 10000; - var socketTimeoutMS = self.options.socketTimeoutMS || 30000; - - // Just ensure we don't have a full cycle dependency - var Db = require('../../db').Db - var Server = require('../server').Server; - - // Set up a new server instance - var newServer = new Server(server.host, server.port, { - auto_reconnect: false - , returnIsMasterResults: true - , poolSize: 1 - , socketOptions: { - connectTimeoutMS: connectTimeoutMS, - socketTimeoutMS: socketTimeoutMS, - keepAlive: 100 - } - , ssl: this.options.ssl - , sslValidate: this.options.sslValidate - , sslCA: this.options.sslCA - , sslCert: this.options.sslCert - , sslKey: this.options.sslKey - , sslPass: this.options.sslPass - }); - - // Create new dummy db for app - self.db = new Db('local', newServer, {w:1}); - - // Set up the event listeners - newServer.once("error", _handle(this, newServer)); - newServer.once("close", _handle(this, newServer)); - newServer.once("timeout", _handle(this, newServer)); - newServer.name = format("%s:%s", server.host, server.port); - - // Let's attempt a connection over here - newServer.connect(self.db, function(err, result, _server) { - if(self.state == HighAvailabilityProcess.STOPPED) { - _server.close(); - } - - if(err) { - // Close the server - _server.close(); - // Check if we can even do HA (is there anything running) - if(Object.keys(self.replset._state.addresses).length == 0) { - return; - } - - // Let's boot the ha timeout settings - setTimeout(function() { - self.start(); - }, self.options.haInterval); - } else { - self.server = _server; - // Let's boot the ha timeout settings - setTimeout(_timeoutHandle(self), self.options.haInterval); - } - }); -} - -HighAvailabilityProcess.prototype.stop = function() { - this.state = HighAvailabilityProcess.STOPPED; - if(this.server) this.server.close(); -} - -var _timeoutHandle = function(self) { - return function() { - if(self.state == HighAvailabilityProcess.STOPPED) { - // Stop all server instances - for(var name in self.replset._state.addresses) { - self.replset._state.addresses[name].close(); - delete self.replset._state.addresses[name]; - } - - // Finished pinging - return; - } - - // If the server is connected - if(self.server.isConnected() && !self._haProcessInProcess) { - // Start HA process - self._haProcessInProcess = true; - // Execute is master command - self.db._executeQueryCommand(DbCommand.createIsMasterCommand(self.db), - {failFast:true, connection: self.server.checkoutReader()} - , function(err, res) { - if(err) { - self.server.close(); - return setTimeout(_timeoutHandle(self), self.options.haInterval); - } - - // Master document - var master = res.documents[0]; - var hosts = master.hosts || []; - var reconnect_servers = []; - var state = self.replset._state; - - // We are in recovery mode, let's remove the current server - if(!master.ismaster - && !master.secondary - && state.addresses[master.me]) { - self.server.close(); - state.addresses[master.me].close(); - delete state.secondaries[master.me]; - return setTimeout(_timeoutHandle(self), self.options.haInterval); - } - - // For all the hosts let's check that we have connections - for(var i = 0; i < hosts.length; i++) { - var host = hosts[i]; - // Check if we need to reconnect to a server - if(state.addresses[host] == null) { - reconnect_servers.push(host); - } else if(state.addresses[host] && !state.addresses[host].isConnected()) { - state.addresses[host].close(); - delete state.secondaries[host]; - reconnect_servers.push(host); - } - - if((master.primary && state.master == null) - || (master.primary && state.master.name != master.primary)) { - - // Locate the primary and set it - if(state.addresses[master.primary]) { - if(state.master) state.master.close(); - delete state.secondaries[master.primary]; - state.master = state.addresses[master.primary]; - } - - // Set up the changes - if(state.master != null && state.master.isMasterDoc != null) { - state.master.isMasterDoc.ismaster = true; - state.master.isMasterDoc.secondary = false; - } else if(state.master != null) { - state.master.isMasterDoc = master; - state.master.isMasterDoc.ismaster = true; - state.master.isMasterDoc.secondary = false; - } - - // Execute any waiting commands (queries or writes) - self.replset._commandsStore.execute_queries(); - self.replset._commandsStore.execute_writes(); - } - } - - // Let's reconnect to any server needed - if(reconnect_servers.length > 0) { - _reconnect_servers(self, reconnect_servers); - } else { - self._haProcessInProcess = false - return setTimeout(_timeoutHandle(self), self.options.haInterval); - } - }); - } else if(!self.server.isConnected()) { - setTimeout(function() { - return self.start(); - }, self.options.haInterval); - } else { - setTimeout(_timeoutHandle(self), self.options.haInterval); - } - } -} - -var _reconnect_servers = function(self, reconnect_servers) { - if(reconnect_servers.length == 0) { - self._haProcessInProcess = false - return setTimeout(_timeoutHandle(self), self.options.haInterval); - } - - // Unpack connection options - var connectTimeoutMS = self.options.connectTimeoutMS || 10000; - var socketTimeoutMS = self.options.socketTimeoutMS || 0; - - // Server class - var Db = require('../../db').Db - var Server = require('../server').Server; - // Get the host - var host = reconnect_servers.shift(); - // Split it up - var _host = host.split(":")[0]; - var _port = parseInt(host.split(":")[1], 10); - - // Set up a new server instance - var newServer = new Server(_host, _port, { - auto_reconnect: false - , returnIsMasterResults: true - , poolSize: self.options.poolSize - , socketOptions: { - connectTimeoutMS: connectTimeoutMS, - socketTimeoutMS: socketTimeoutMS - } - , ssl: self.options.ssl - , sslValidate: self.options.sslValidate - , sslCA: self.options.sslCA - , sslCert: self.options.sslCert - , sslKey: self.options.sslKey - , sslPass: self.options.sslPass - }); - - // Create new dummy db for app - var db = new Db('local', newServer, {w:1}); - var state = self.replset._state; - - // Set up the event listeners - newServer.once("error", _repl_set_handler("error", self.replset, newServer)); - newServer.once("close", _repl_set_handler("close", self.replset, newServer)); - newServer.once("timeout", _repl_set_handler("timeout", self.replset, newServer)); - - // Set shared state - newServer.name = host; - newServer._callBackStore = self.replset._callBackStore; - newServer.replicasetInstance = self.replset; - newServer.enableRecordQueryStats(self.replset.recordQueryStats); - - // Let's attempt a connection over here - newServer.connect(db, function(err, result, _server) { - if(self.state == HighAvailabilityProcess.STOPPED) { - _server.close(); - } - - // If we connected let's check what kind of server we have - if(!err) { - _apply_auths(self, db, _server, function(err, result) { - if(err) { - _server.close(); - // Process the next server - return setTimeout(function() { - _reconnect_servers(self, reconnect_servers); - }, self.options.haInterval); - } - var doc = _server.isMasterDoc; - // Fire error on any unknown callbacks for this server - self.replset.__executeAllServerSpecificErrorCallbacks(_server.socketOptions.host, _server.socketOptions.port, err); - - if(doc.ismaster) { - // Emit primary added - self.replset.emit('joined', "primary", doc, _server); - - // If it was a secondary remove it - if(state.secondaries[doc.me]) { - delete state.secondaries[doc.me]; - } - - // Override any server in list of addresses - state.addresses[doc.me] = _server; - // Set server as master - state.master = _server; - // Execute any waiting writes - self.replset._commandsStore.execute_writes(); - } else if(doc.secondary) { - // Emit secondary added - self.replset.emit('joined', "secondary", doc, _server); - // Add the secondary to the state - state.secondaries[doc.me] = _server; - // Override any server in list of addresses - state.addresses[doc.me] = _server; - // Execute any waiting reads - self.replset._commandsStore.execute_queries(); - } else { - _server.close(); - } - - // Set any tags on the instance server - _server.name = doc.me; - _server.tags = doc.tags; - // Process the next server - setTimeout(function() { - _reconnect_servers(self, reconnect_servers); - }, self.options.haInterval); - }); - } else { - _server.close(); - self.replset.__executeAllServerSpecificErrorCallbacks(_server.socketOptions.host, _server.socketOptions.port, err); - - setTimeout(function() { - _reconnect_servers(self, reconnect_servers); - }, self.options.haInterval); - } - }); -} - -var _apply_auths = function(self, _db, _server, _callback) { - if(self.replset.auth.length() == 0) return _callback(null); - // Apply any authentication needed - if(self.replset.auth.length() > 0) { - var pending = self.replset.auth.length(); - var connections = _server.allRawConnections(); - var pendingAuthConn = connections.length; - - // Connection function - var connectionFunction = function(_auth, _connection, __callback) { - var pending = _auth.length(); - - for(var j = 0; j < pending; j++) { - // Get the auth object - var _auth = _auth.get(j); - // Unpack the parameter - var username = _auth.username; - var password = _auth.password; - var options = { - authMechanism: _auth.authMechanism - , authSource: _auth.authdb - , connection: _connection - }; - - // If we have changed the service name - if(_auth.gssapiServiceName) - options.gssapiServiceName = _auth.gssapiServiceName; - - // Hold any error - var _error = null; - - // Authenticate against the credentials - _db.authenticate(username, password, options, function(err, result) { - _error = err != null ? err : _error; - // Adjust the pending authentication - pending = pending - 1; - // Finished up - if(pending == 0) __callback(_error ? _error : null, _error ? false : true); - }); - } - } - - // Final error object - var finalError = null; - // Iterate over all the connections - for(var i = 0; i < connections.length; i++) { - connectionFunction(self.replset.auth, connections[i], function(err, result) { - // Pending authentication - pendingAuthConn = pendingAuthConn - 1 ; - - // Save error if any - finalError = err ? err : finalError; - - // If we are done let's finish up - if(pendingAuthConn == 0) { - _callback(null); - } - }); - } - } -} - -var _handle = function(self, server) { - return function(err) { - server.close(); - } -} - -var _repl_set_handler = function(event, self, server) { - var ReplSet = require('./repl_set').ReplSet; - - return function(err, doc) { - server.close(); - - // The event happened to a primary - // Remove it from play - if(self._state.isPrimary(server)) { - self._state.master == null; - self._serverState = ReplSet.REPLSET_READ_ONLY; - } else if(self._state.isSecondary(server)) { - delete self._state.secondaries[server.name]; - } - - // Unpack variables - var host = server.socketOptions.host; - var port = server.socketOptions.port; - - // Fire error on any unknown callbacks - self.__executeAllServerSpecificErrorCallbacks(host, port, err); - } -} - -exports.HighAvailabilityProcess = HighAvailabilityProcess; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1a45507/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/options.js ---------------------------------------------------------------------- diff --git a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/options.js b/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/options.js deleted file mode 100644 index a5658e3..0000000 --- a/web/demos/package/node_modules/mongodb/lib/mongodb/connection/repl_set/options.js +++ /dev/null @@ -1,126 +0,0 @@ -var PingStrategy = require('./strategies/ping_strategy').PingStrategy - , StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy - , ReadPreference = require('../read_preference').ReadPreference; - -var Options = function(options) { - options = options || {}; - this._options = options; - this.ha = options.ha || true; - this.haInterval = options.haInterval || 2000; - this.reconnectWait = options.reconnectWait || 1000; - this.retries = options.retries || 30; - this.rs_name = options.rs_name; - this.socketOptions = options.socketOptions || {}; - this.readPreference = options.readPreference; - this.readSecondary = options.read_secondary; - this.poolSize = options.poolSize == null ? 5 : options.poolSize; - this.strategy = options.strategy || 'ping'; - this.secondaryAcceptableLatencyMS = options.secondaryAcceptableLatencyMS || 15; - this.connectArbiter = options.connectArbiter || false; - this.connectWithNoPrimary = options.connectWithNoPrimary || false; - this.logger = options.logger; - this.ssl = options.ssl || false; - this.sslValidate = options.sslValidate || false; - this.sslCA = options.sslCA; - this.sslCert = options.sslCert; - this.sslKey = options.sslKey; - this.sslPass = options.sslPass; - this.emitOpen = options.emitOpen || true; -} - -Options.prototype.init = function() { - if(this.sslValidate && (!Array.isArray(this.sslCA) || this.sslCA.length == 0)) { - throw new Error("The driver expects an Array of CA certificates in the sslCA parameter when enabling sslValidate"); - } - - // Make sure strategy is one of the two allowed - if(this.strategy != null && (this.strategy != 'ping' && this.strategy != 'statistical' && this.strategy != 'none')) - throw new Error("Only ping or statistical strategies allowed"); - - if(this.strategy == null) this.strategy = 'ping'; - - // Set logger if strategy exists - if(this.strategyInstance) this.strategyInstance.logger = this.logger; - - // Unpack read Preference - var readPreference = this.readPreference; - // Validate correctness of Read preferences - if(readPreference != null) { - if(readPreference != ReadPreference.PRIMARY && readPreference != ReadPreference.PRIMARY_PREFERRED - && readPreference != ReadPreference.SECONDARY && readPreference != ReadPreference.SECONDARY_PREFERRED - && readPreference != ReadPreference.NEAREST && typeof readPreference != 'object' && readPreference['_type'] != 'ReadPreference') { - throw new Error("Illegal readPreference mode specified, " + readPreference); - } - - this.readPreference = readPreference; - } else { - this.readPreference = null; - } - - // Ensure read_secondary is set correctly - if(this.readSecondary != null) - this.readSecondary = this.readPreference == ReadPreference.PRIMARY - || this.readPreference == false - || this.readPreference == null ? false : true; - - // Ensure correct slave set - if(this.readSecondary) this.slaveOk = true; - - // Set up logger if any set - this.logger = this.logger != null - && (typeof this.logger.debug == 'function') - && (typeof this.logger.error == 'function') - && (typeof this.logger.debug == 'function') - ? this.logger : {error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}}; - - // Connection timeout - this.connectTimeoutMS = this.socketOptions.connectTimeoutMS - ? this.socketOptions.connectTimeoutMS - : 1000; - - // Socket connection timeout - this.socketTimeoutMS = this.socketOptions.socketTimeoutMS - ? this.socketOptions.socketTimeoutMS - : 30000; -} - -Options.prototype.decorateAndClean = function(servers, callBackStore) { - var self = this; - - // var de duplicate list - var uniqueServers = {}; - // De-duplicate any servers in the seed list - for(var i = 0; i < servers.length; i++) { - var server = servers[i]; - // If server does not exist set it - if(uniqueServers[server.host + ":" + server.port] == null) { - uniqueServers[server.host + ":" + server.port] = server; - } - } - - // Let's set the deduplicated list of servers - var finalServers = []; - // Add the servers - for(var key in uniqueServers) { - finalServers.push(uniqueServers[key]); - } - - finalServers.forEach(function(server) { - // Ensure no server has reconnect on - server.options.auto_reconnect = false; - // Set up ssl options - server.ssl = self.ssl; - server.sslValidate = self.sslValidate; - server.sslCA = self.sslCA; - server.sslCert = self.sslCert; - server.sslKey = self.sslKey; - server.sslPass = self.sslPass; - server.poolSize = self.poolSize; - // Set callback store - server._callBackStore = callBackStore; - }); - - return finalServers; -} - -exports.Options = Options;
