Repository: qpid-dispatch Updated Branches: refs/heads/master f87a33389 -> f543ba2b5
DISPATCH-1076 Keep console source files separate Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f543ba2b Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f543ba2b Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f543ba2b Branch: refs/heads/master Commit: f543ba2b590be1afa9d1afe55654d0dcf9aa7f9f Parents: f87a333 Author: Ernest Allen <[email protected]> Authored: Wed Jul 11 06:20:40 2018 -0400 Committer: Ernest Allen <[email protected]> Committed: Wed Jul 11 06:20:40 2018 -0400 ---------------------------------------------------------------------- console/CMakeLists.txt | 7 +- console/stand-alone/gulpfile.js | 2 +- console/stand-alone/index.html | 4 +- console/stand-alone/modules/connection.js | 347 ---------------- console/stand-alone/modules/correlator.js | 50 --- console/stand-alone/modules/management.js | 63 --- console/stand-alone/modules/topology.js | 403 ------------------- console/stand-alone/modules/utilities.js | 115 ------ .../stand-alone/plugin/js/amqp/connection.js | 347 ++++++++++++++++ .../stand-alone/plugin/js/amqp/correlator.js | 50 +++ .../stand-alone/plugin/js/amqp/management.js | 63 +++ console/stand-alone/plugin/js/amqp/topology.js | 403 +++++++++++++++++++ console/stand-alone/plugin/js/amqp/utilities.js | 115 ++++++ console/stand-alone/plugin/js/qdrService.js | 4 +- tests/system_tests_console.py | 28 +- 15 files changed, 1001 insertions(+), 1000 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/console/CMakeLists.txt b/console/CMakeLists.txt index a55b572..4cc1cf8 100644 --- a/console/CMakeLists.txt +++ b/console/CMakeLists.txt @@ -39,14 +39,12 @@ if(CONSOLE_INSTALL) ## Files needed to create the ${CONSOLE_ARTIFACTS} file (GLOB_RECURSE CONSOLE_JS_SOURCES ${CONSOLE_SOURCE_DIR}/plugin/js/*.js) file (GLOB_RECURSE CONSOLE_TS_SOURCES ${CONSOLE_SOURCE_DIR}/plugin/js/*.ts) - file (GLOB_RECURSE CONSOLE_MODULE_SOURCES ${CONSOLE_SOURCE_DIR}/modules/*.js) set(CONSOLE_CSS_SOURCE ${CONSOLE_SOURCE_DIR}/plugin/css/dispatch.css) set(CONSOLE_MAIN ${CONSOLE_SOURCE_DIR}/main.js) set(ALL_CONSOLE_SOURCES ${CONSOLE_MAIN} ${CONSOLE_MODULE_SOURCES} ${CONSOLE_JS_SOURCES} ${CONSOLE_TS_SOURCES} ${CONSOLE_CSS_SOURCE}) ## Files created during the console build set(CONSOLE_ARTIFACTS - ${CONSOLE_BUILD_DIR}/dist/js/main.min.js ${CONSOLE_BUILD_DIR}/dist/js/vendor.min.js ${CONSOLE_BUILD_DIR}/dist/css/dispatch.min.css ${CONSOLE_BUILD_DIR}/dist/css/vendor.min.css @@ -82,6 +80,7 @@ if(CONSOLE_INSTALL) ## Files copied to the root of the console's install dir set(BASE_FILES ${CONSOLE_SOURCE_DIR}/index.html + ${CONSOLE_SOURCE_DIR}/main.js ${CONSOLE_SOURCE_DIR}/favicon-32x32.png ) ## Files copied to the css/ dir @@ -116,6 +115,10 @@ if(CONSOLE_INSTALL) DESTINATION ${CONSOLE_STAND_ALONE_INSTALL_DIR}/html FILES_MATCHING PATTERN "*.html" ) + install(DIRECTORY ${CONSOLE_SOURCE_DIR}/plugin/js/ + DESTINATION ${CONSOLE_STAND_ALONE_INSTALL_DIR}/plugin/js + FILES_MATCHING PATTERN "*.js" + ) install(FILES ${BASE_FILES} DESTINATION ${CONSOLE_STAND_ALONE_INSTALL_DIR} ) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/gulpfile.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/gulpfile.js b/console/stand-alone/gulpfile.js index 0ddaf10..d996135 100644 --- a/console/stand-alone/gulpfile.js +++ b/console/stand-alone/gulpfile.js @@ -227,7 +227,7 @@ function test () { var build = gulp.series( clean, // removes the dist/ dir lint, // lints the .js - gulp.parallel(vendor_styles, vendor_scripts, styles, scripts), // uglify and concat + gulp.parallel(vendor_styles, vendor_scripts, styles), // uglify and concat cleanup // remove .js that were converted from .ts ); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/index.html ---------------------------------------------------------------------- diff --git a/console/stand-alone/index.html b/console/stand-alone/index.html index 1f08594..acb2d69 100644 --- a/console/stand-alone/index.html +++ b/console/stand-alone/index.html @@ -75,9 +75,9 @@ under the License. </div> </div> -<!-- <script type="module" src="js/main.min.js"></script> --> +<script type="module" src="main.js"></script> <script type="text/javascript" src="js/vendor.min.js"></script> -<script type="text/javascript" src="js/main.min.js"></script> +<!-- <script type="text/javascript" src="js/main.min.js"></script> --> <script defer nomodule> var installError = document.getElementById('installError'); if (installError) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/modules/connection.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/modules/connection.js b/console/stand-alone/modules/connection.js deleted file mode 100644 index db21e01..0000000 --- a/console/stand-alone/modules/connection.js +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Copyright 2017 Red Hat Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* global Promise */ - -const rhea = require('rhea'); -//import { on, websocket_connect, removeListener, once, connect } from 'rhea'; -import Correlator from './correlator.js'; - -class ConnectionManager { - constructor(protocol) { - this.sender = undefined; - this.receiver = undefined; - this.connection = undefined; - this.version = undefined; - this.errorText = undefined; - this.protocol = protocol; - this.schema = undefined; - this.connectActions = []; - this.disconnectActions = []; - this.correlator = new Correlator(); - this.on_message = (function (context) { - this.correlator.resolve(context); - }).bind(this); - this.on_disconnected = (function () { - this.errorText = 'Disconnected'; - this.executeDisconnectActions(this.errorText); - }).bind(this); - this.on_connection_open = (function () { - this.executeConnectActions(); - }).bind(this); - } - versionCheck(minVer) { - var verparts = this.version.split('.'); - var minparts = minVer.split('.'); - try { - for (var i = 0; i < minparts.length; ++i) { - if (parseInt(minVer[i] > parseInt(verparts[i]))) - return false; - } - } - catch (e) { - return false; - } - return true; - } - addConnectAction(action) { - if (typeof action === 'function') { - this.delConnectAction(action); - this.connectActions.push(action); - } - } - addDisconnectAction(action) { - if (typeof action === 'function') { - this.delDisconnectAction(action); - this.disconnectActions.push(action); - } - } - delConnectAction(action) { - if (typeof action === 'function') { - var index = this.connectActions.indexOf(action); - if (index >= 0) - this.connectActions.splice(index, 1); - } - } - delDisconnectAction(action) { - if (typeof action === 'function') { - var index = this.disconnectActions.indexOf(action); - if (index >= 0) - this.disconnectActions.splice(index, 1); - } - } - executeConnectActions() { - this.connectActions.forEach(function (action) { - try { - action(); - } - catch (e) { - // in case the page that registered the handler has been unloaded - } - }); - this.connectActions = []; - } - executeDisconnectActions(message) { - this.disconnectActions.forEach(function (action) { - try { - action(message); - } - catch (e) { - // in case the page that registered the handler has been unloaded - } - }); - this.disconnectActions = []; - } - on(eventType, fn) { - if (eventType === 'connected') { - this.addConnectAction(fn); - } - else if (eventType === 'disconnected') { - this.addDisconnectAction(fn); - } - else { - console.log('unknown event type ' + eventType); - } - } - setSchema(schema) { - this.schema = schema; - } - is_connected() { - return this.connection && - this.sender && - this.receiver && - this.receiver.remote && - this.receiver.remote.attach && - this.receiver.remote.attach.source && - this.receiver.remote.attach.source.address && - this.connection.is_open(); - } - disconnect() { - if (this.sender) - this.sender.close(); - if (this.receiver) - this.receiver.close(); - if (this.connection) - this.connection.close(); - } - createSenderReceiver(options) { - return new Promise((function (resolve, reject) { - var timeout = options.timeout || 10000; - // set a timer in case the setup takes too long - var giveUp = (function () { - this.connection.removeListener('receiver_open', receiver_open); - this.connection.removeListener('sendable', sendable); - this.errorText = 'timed out creating senders and receivers'; - reject(Error(this.errorText)); - }).bind(this); - var timer = setTimeout(giveUp, timeout); - // register an event hander for when the setup is complete - var sendable = (function (context) { - clearTimeout(timer); - this.version = this.connection.properties ? this.connection.properties.version : '0.1.0'; - // in case this connection dies - rhea.on('disconnected', this.on_disconnected); - // in case this connection dies and is then reconnected automatically - rhea.on('connection_open', this.on_connection_open); - // receive messages here - this.connection.on('message', this.on_message); - resolve(context); - }).bind(this); - this.connection.once('sendable', sendable); - // Now actually createt the sender and receiver. - // register an event handler for when the receiver opens - var receiver_open = (function () { - // once the receiver is open, create the sender - if (options.sender_address) - this.sender = this.connection.open_sender(options.sender_address); - else - this.sender = this.connection.open_sender(); - }).bind(this); - this.connection.once('receiver_open', receiver_open); - // create a dynamic receiver - this.receiver = this.connection.open_receiver({ source: { dynamic: true } }); - }).bind(this)); - } - connect(options) { - return new Promise((function (resolve, reject) { - var finishConnecting = function () { - this.createSenderReceiver(options) - .then(function (results) { - resolve(results); - }, function (error) { - reject(error); - }); - }; - if (!this.connection) { - options.test = false; // if you didn't want a connection, you should have called testConnect() and not connect() - this.testConnect(options) - .then((function () { - finishConnecting.call(this); - }).bind(this), (function () { - // connect failed or timed out - this.errorText = 'Unable to connect'; - this.executeDisconnectActions(this.errorText); - reject(Error(this.errorText)); - }).bind(this)); - } - else { - finishConnecting.call(this); - } - }).bind(this)); - } - getReceiverAddress() { - return this.receiver.remote.attach.source.address; - } - // Try to connect using the options. - // if options.test === true -> close the connection if it succeeded and resolve the promise - // if the connection attempt fails or times out, reject the promise regardless of options.test - testConnect(options, callback) { - return new Promise((function (resolve, reject) { - var timeout = options.timeout || 10000; - var reconnect = options.reconnect || false; // in case options.reconnect is undefined - var baseAddress = options.address + ':' + options.port; - if (options.linkRouteAddress) { - baseAddress += ('/' + options.linkRouteAddress); - } - var wsprotocol = location.protocol === 'https:' ? 'wss' : 'ws'; - if (this.connection) { - delete this.connection; - this.connection = null; - } - var ws = rhea.websocket_connect(WebSocket); - var c = { - connection_details: new ws(wsprotocol + '://' + baseAddress, ['binary']), - reconnect: reconnect, - properties: options.properties || { console_identifier: 'Dispatch console' } - }; - if (options.hostname) - c.hostname = options.hostname; - if (options.username && options.username !== '') { - c.username = options.username; - } - if (options.password && options.password !== '') { - c.password = options.password; - } - // set a timeout - var disconnected = (function () { - clearTimeout(timer); - rhea.removeListener('disconnected', disconnected); - rhea.removeListener('connection_open', connection_open); - this.connection = null; - var rej = 'failed to connect'; - if (callback) - callback({ error: rej }); - reject(Error(rej)); - }).bind(this); - var timer = setTimeout(disconnected, timeout); - // the event handler for when the connection opens - var connection_open = (function (context) { - clearTimeout(timer); - // prevent future disconnects from calling reject - rhea.removeListener('disconnected', disconnected); - // we were just checking. we don't really want a connection - if (options.test) { - context.connection.close(); - this.connection = null; - } - else - this.on_connection_open(); - var res = { context: context }; - if (callback) - callback(res); - resolve(res); - }).bind(this); - // register an event handler for when the connection opens - rhea.once('connection_open', connection_open); - // register an event handler for if the connection fails to open - rhea.once('disconnected', disconnected); - // attempt the connection - this.connection = rhea.connect(c); - }).bind(this)); - } - sendMgmtQuery(operation) { - return this.send([], '/$management', operation); - } - sendQuery(toAddr, entity, attrs, operation) { - operation = operation || 'QUERY'; - var fullAddr = this._fullAddr(toAddr); - var body = { attributeNames: attrs || [] }; - return this.send(body, fullAddr, operation, this.schema.entityTypes[entity].fullyQualifiedType); - } - send(body, to, operation, entityType) { - var application_properties = { - operation: operation, - type: 'org.amqp.management', - name: 'self' - }; - if (entityType) - application_properties.entityType = entityType; - return this._send(body, to, application_properties); - } - sendMethod(toAddr, entity, attrs, operation, props) { - var fullAddr = this._fullAddr(toAddr); - var application_properties = { - operation: operation, - }; - if (entity) { - application_properties.type = this.schema.entityTypes[entity].fullyQualifiedType; - } - if (attrs.name) - application_properties.name = attrs.name; - else if (attrs.identity) - application_properties.identity = attrs.identity; - if (props) { - for (var attrname in props) { - application_properties[attrname] = props[attrname]; - } - } - return this._send(attrs, fullAddr, application_properties); - } - _send(body, to, application_properties) { - var _correlationId = this.correlator.corr(); - var self = this; - return new Promise(function (resolve, reject) { - self.correlator.register(_correlationId, resolve, reject); - self.sender.send({ - body: body, - to: to, - reply_to: self.receiver.remote.attach.source.address, - correlation_id: _correlationId, - application_properties: application_properties - }); - }); - } - _fullAddr(toAddr) { - var toAddrParts = toAddr.split('/'); - toAddrParts.shift(); - var fullAddr = toAddrParts.join('/'); - return fullAddr; - } - availableQeueuDepth() { - return this.correlator.depth(); - } -} - -class ConnectionException { - constructor(message) { - this.message = message; - this.name = 'ConnectionException'; - } -} - -const _ConnectionManager = ConnectionManager; -export { _ConnectionManager as ConnectionManager }; -const _ConnectionException = ConnectionException; -export { _ConnectionException as ConnectionException }; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/modules/correlator.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/modules/correlator.js b/console/stand-alone/modules/correlator.js deleted file mode 100644 index bf34f93..0000000 --- a/console/stand-alone/modules/correlator.js +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017 Red Hat Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { utils } from './utilities.js'; - -class Correlator { - constructor() { - this._objects = {}; - this._correlationID = 0; - this.maxCorrelatorDepth = 10; - } - corr() { - return ++(this._correlationID) + ''; - } - // Associate this correlation id with the promise's resolve and reject methods - register(id, resolve, reject) { - this._objects[id] = { resolver: resolve, rejector: reject }; - } - // Call the promise's resolve method. - // This is called by rhea's receiver.on('message') function - resolve(context) { - var correlationID = context.message.correlation_id; - // call the promise's resolve function with a copy of the rhea response (so we don't keep any references to internal rhea data) - this._objects[correlationID].resolver({ response: utils.copy(context.message.body), context: context }); - delete this._objects[correlationID]; - } - reject(id, error) { - this._objects[id].rejector(error); - delete this._objects[id]; - } - // Return the number of requests that can be sent before we start queuing requests - depth() { - return Math.max(1, this.maxCorrelatorDepth - Object.keys(this._objects).length); - } -} - -export default Correlator; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/modules/management.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/modules/management.js b/console/stand-alone/modules/management.js deleted file mode 100644 index 4b3bb32..0000000 --- a/console/stand-alone/modules/management.js +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2015 Red Hat Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* global Promise */ - -import { ConnectionManager } from './connection.js'; -import Topology from './topology.js'; - -export class Management { - constructor(protocol) { - this.connection = new ConnectionManager(protocol); - this.topology = new Topology(this.connection); - } - getSchema(callback) { - var self = this; - return new Promise(function (resolve, reject) { - self.connection.sendMgmtQuery('GET-SCHEMA') - .then(function (responseAndContext) { - var response = responseAndContext.response; - for (var entityName in response.entityTypes) { - var entity = response.entityTypes[entityName]; - if (entity.deprecated) { - // deprecated entity - delete response.entityTypes[entityName]; - } - else { - for (var attributeName in entity.attributes) { - var attribute = entity.attributes[attributeName]; - if (attribute.deprecated) { - // deprecated attribute - delete response.entityTypes[entityName].attributes[attributeName]; - } - } - } - } - self.connection.setSchema(response); - if (callback) - callback(response); - resolve(response); - }, function (error) { - if (callback) - callback(error); - reject(error); - }); - }); - } - schema() { - return this.connection.schema; - } -} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/modules/topology.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/modules/topology.js b/console/stand-alone/modules/topology.js deleted file mode 100644 index e208a6f..0000000 --- a/console/stand-alone/modules/topology.js +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Copyright 2015 Red Hat Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* global Promise d3 */ - -import { utils } from './utilities.js'; - -class Topology { - constructor(connectionManager) { - this.connection = connectionManager; - this.updatedActions = {}; - this.entities = []; // which entities to request each topology update - this.entityAttribs = {}; - this._nodeInfo = {}; // info about all known nodes and entities - this.filtering = false; // filter out nodes that don't have connection info - this.timeout = 5000; - this.updateInterval = 5000; - this._getTimer = null; - this.updating = false; - } - addUpdatedAction(key, action) { - if (typeof action === 'function') { - this.updatedActions[key] = action; - } - } - delUpdatedAction(key) { - if (key in this.updatedActions) - delete this.updatedActions[key]; - } - executeUpdatedActions(error) { - for (var action in this.updatedActions) { - this.updatedActions[action].apply(this, [error]); - } - } - setUpdateEntities(entities) { - this.entities = entities; - for (var i = 0; i < entities.length; i++) { - this.entityAttribs[entities[i]] = []; - } - } - addUpdateEntities(entityAttribs) { - if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { - entityAttribs = [entityAttribs]; - } - for (var i = 0; i < entityAttribs.length; i++) { - var entity = entityAttribs[i].entity; - this.entityAttribs[entity] = entityAttribs[i].attrs || []; - } - } - on(eventName, fn, key) { - if (eventName === 'updated') - this.addUpdatedAction(key, fn); - } - unregister(eventName, key) { - if (eventName === 'updated') - this.delUpdatedAction(key); - } - nodeInfo() { - return this._nodeInfo; - } - get() { - return new Promise((function (resolve, reject) { - this.connection.sendMgmtQuery('GET-MGMT-NODES') - .then((function (response) { - response = response.response; - if (Object.prototype.toString.call(response) === '[object Array]') { - var workInfo = {}; - // if there is only one node, it will not be returned - if (response.length === 0) { - var parts = this.connection.getReceiverAddress().split('/'); - parts[parts.length - 1] = '$management'; - response.push(parts.join('/')); - } - for (var i = 0; i < response.length; ++i) { - workInfo[response[i]] = {}; - } - var gotResponse = function (nodeName, entity, response) { - workInfo[nodeName][entity] = response; - }; - var q = d3.queue(this.connection.availableQeueuDepth()); - for (var id in workInfo) { - for (var entity in this.entityAttribs) { - q.defer((this.q_fetchNodeInfo).bind(this), id, entity, this.entityAttribs[entity], q, gotResponse); - } - } - q.await((function () { - // filter out nodes that have no connection info - if (this.filtering) { - for (var id in workInfo) { - if (!(workInfo[id].connection)) { - this.flux = true; - delete workInfo[id]; - } - } - } - this._nodeInfo = utils.copy(workInfo); - this.onDone(this._nodeInfo); - resolve(this._nodeInfo); - }).bind(this)); - } - }).bind(this), function (error) { - reject(error); - }); - }).bind(this)); - } - onDone(result) { - clearTimeout(this._getTimer); - if (this.updating) - this._getTimer = setTimeout((this.get).bind(this), this.updateInterval); - this.executeUpdatedActions(result); - } - startUpdating(filter) { - this.stopUpdating(); - this.updating = true; - this.filtering = filter; - this.get(); - } - stopUpdating() { - this.updating = false; - if (this._getTimer) { - clearTimeout(this._getTimer); - this._getTimer = null; - } - } - fetchEntity(node, entity, attrs, callback) { - var results = {}; - var gotResponse = function (nodeName, dotentity, response) { - results = response; - }; - var q = d3.queue(this.connection.availableQeueuDepth()); - q.defer((this.q_fetchNodeInfo).bind(this), node, entity, attrs, q, gotResponse); - q.await(function () { - callback(node, entity, results); - }); - } - // called from d3.queue.defer so the last argument (callback) is supplied by d3 - q_fetchNodeInfo(nodeId, entity, attrs, q, heartbeat, callback) { - this.getNodeInfo(nodeId, entity, attrs, q, function (nodeName, dotentity, response) { - heartbeat(nodeName, dotentity, response); - callback(null); - }); - } - // get all the requested entities/attributes for a single router - fetchEntities(node, entityAttribs, doneCallback, resultCallback) { - var q = d3.queue(this.connection.availableQeueuDepth()); - var results = {}; - if (!resultCallback) { - resultCallback = function (nodeName, dotentity, response) { - if (!results[nodeName]) - results[nodeName] = {}; - results[nodeName][dotentity] = response; - }; - } - var gotAResponse = function (nodeName, dotentity, response) { - resultCallback(nodeName, dotentity, response); - }; - if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { - entityAttribs = [entityAttribs]; - } - for (var i = 0; i < entityAttribs.length; ++i) { - var ea = entityAttribs[i]; - q.defer((this.q_fetchNodeInfo).bind(this), node, ea.entity, ea.attrs || [], q, gotAResponse); - } - q.await(function () { - doneCallback(results); - }); - } - // get all the requested entities for all known routers - fetchAllEntities(entityAttribs, doneCallback, resultCallback) { - var q = d3.queue(this.connection.availableQeueuDepth()); - var results = {}; - if (!resultCallback) { - resultCallback = function (nodeName, dotentity, response) { - if (!results[nodeName]) - results[nodeName] = {}; - results[nodeName][dotentity] = response; - }; - } - var gotAResponse = function (nodeName, dotentity, response) { - resultCallback(nodeName, dotentity, response); - }; - if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { - entityAttribs = [entityAttribs]; - } - var nodes = Object.keys(this._nodeInfo); - for (var n = 0; n < nodes.length; ++n) { - for (var i = 0; i < entityAttribs.length; ++i) { - var ea = entityAttribs[i]; - q.defer((this.q_fetchNodeInfo).bind(this), nodes[n], ea.entity, ea.attrs || [], q, gotAResponse); - } - } - q.await(function () { - doneCallback(results); - }); - } - // enusre all the topology nones have all these entities - ensureAllEntities(entityAttribs, callback, extra) { - this.ensureEntities(Object.keys(this._nodeInfo), entityAttribs, callback, extra); - } - // ensure these nodes have all these entities. don't fetch unless forced to - ensureEntities(nodes, entityAttribs, callback, extra) { - if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { - entityAttribs = [entityAttribs]; - } - if (Object.prototype.toString.call(nodes) !== '[object Array]') { - nodes = [nodes]; - } - this.addUpdateEntities(entityAttribs); - var q = d3.queue(this.connection.availableQeueuDepth()); - for (var n = 0; n < nodes.length; ++n) { - for (var i = 0; i < entityAttribs.length; ++i) { - var ea = entityAttribs[i]; - // if we don'e already have the entity or we want to force a refresh - if (!this._nodeInfo[nodes[n]][ea.entity] || ea.force) - q.defer((this.q_ensureNodeInfo).bind(this), nodes[n], ea.entity, ea.attrs || [], q); - } - } - q.await(function () { - callback(extra); - }); - } - addNodeInfo(id, entity, values) { - // save the results in the nodeInfo object - if (id) { - if (!(id in this._nodeInfo)) { - this._nodeInfo[id] = {}; - } - // copy the values to allow garbage collection - this._nodeInfo[id][entity] = values; - } - } - isLargeNetwork() { - return Object.keys(this._nodeInfo).length >= 12; - } - getConnForLink(link) { - // find the connection for this link - var conns = this._nodeInfo[link.nodeId].connection; - var connIndex = conns.attributeNames.indexOf('identity'); - var linkCons = conns.results.filter(function (conn) { - return conn[connIndex] === link.connectionId; - }); - return utils.flatten(conns.attributeNames, linkCons[0]); - } - nodeNameList() { - var nl = []; - for (var id in this._nodeInfo) { - nl.push(utils.nameFromId(id)); - } - return nl.sort(); - } - nodeIdList() { - var nl = []; - for (var id in this._nodeInfo) { - //if (this._nodeInfo['connection']) - nl.push(id); - } - return nl.sort(); - } - nodeList() { - var nl = []; - for (var id in this._nodeInfo) { - nl.push({ - name: utils.nameFromId(id), - id: id - }); - } - return nl; - } - // d3.queue'd function to make a management query for entities/attributes - q_ensureNodeInfo(nodeId, entity, attrs, q, callback) { - this.getNodeInfo(nodeId, entity, attrs, q, (function (nodeName, dotentity, response) { - this.addNodeInfo(nodeName, dotentity, response); - callback(null); - }).bind(this)); - return { - abort: function () { - delete this._nodeInfo[nodeId]; - } - }; - } - getNodeInfo(nodeName, entity, attrs, q, callback) { - var timedOut = function (q) { - q.abort(); - }; - var atimer = setTimeout(timedOut, this.timeout, q); - this.connection.sendQuery(nodeName, entity, attrs) - .then(function (response) { - clearTimeout(atimer); - callback(nodeName, entity, response.response); - }, function () { - q.abort(); - }); - } - getMultipleNodeInfo(nodeNames, entity, attrs, callback, selectedNodeId, aggregate) { - var self = this; - if (typeof aggregate === 'undefined') - aggregate = true; - var responses = {}; - var gotNodesResult = function (nodeName, dotentity, response) { - responses[nodeName] = response; - }; - var q = d3.queue(this.connection.availableQeueuDepth()); - nodeNames.forEach(function (id) { - q.defer((self.q_fetchNodeInfo).bind(self), id, entity, attrs, q, gotNodesResult); - }); - q.await(function () { - if (aggregate) - self.aggregateNodeInfo(nodeNames, entity, selectedNodeId, responses, callback); - else { - callback(nodeNames, entity, responses); - } - }); - } - quiesceLink(nodeId, name) { - var attributes = { - adminStatus: 'disabled', - name: name - }; - return this.connection.sendMethod(nodeId, 'router.link', attributes, 'UPDATE'); - } - aggregateNodeInfo(nodeNames, entity, selectedNodeId, responses, callback) { - // aggregate the responses - var self = this; - var newResponse = {}; - var thisNode = responses[selectedNodeId]; - newResponse.attributeNames = thisNode.attributeNames; - newResponse.results = thisNode.results; - newResponse.aggregates = []; - // initialize the aggregates - for (var i = 0; i < thisNode.results.length; ++i) { - // there is a result for each unique entity found (ie addresses, links, etc.) - var result = thisNode.results[i]; - var vals = []; - // there is a val for each attribute in this entity - result.forEach(function (val) { - vals.push({ - sum: val, - detail: [] - }); - }); - newResponse.aggregates.push(vals); - } - var nameIndex = thisNode.attributeNames.indexOf('name'); - var ent = self.connection.schema.entityTypes[entity]; - var ids = Object.keys(responses); - ids.sort(); - ids.forEach(function (id) { - var response = responses[id]; - var results = response.results; - results.forEach(function (result) { - // find the matching result in the aggregates - var found = newResponse.aggregates.some(function (aggregate) { - if (aggregate[nameIndex].sum === result[nameIndex]) { - // result and aggregate are now the same record, add the graphable values - newResponse.attributeNames.forEach(function (key, i) { - if (ent.attributes[key] && ent.attributes[key].graph) { - if (id != selectedNodeId) - aggregate[i].sum += result[i]; - } - aggregate[i].detail.push({ - node: utils.nameFromId(id) + ':', - val: result[i] - }); - }); - return true; // stop looping - } - return false; // continute looking for the aggregate record - }); - if (!found) { - // this attribute was not found in the aggregates yet - // because it was not in the selectedNodeId's results - var vals = []; - result.forEach(function (val) { - vals.push({ - sum: val, - detail: [{ - node: utils.nameFromId(id), - val: val - }] - }); - }); - newResponse.aggregates.push(vals); - } - }); - }); - callback(nodeNames, entity, newResponse); - } -} - -export default Topology; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/modules/utilities.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/modules/utilities.js b/console/stand-alone/modules/utilities.js deleted file mode 100644 index 328da38..0000000 --- a/console/stand-alone/modules/utilities.js +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2015 Red Hat Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* global d3 */ -var ddd = typeof window === 'undefined' ? require ('d3') : d3; - -var utils = { - isAConsole: function (properties, connectionId, nodeType, key) { - return this.isConsole({ - properties: properties, - connectionId: connectionId, - nodeType: nodeType, - key: key - }); - }, - isConsole: function (d) { - return (d && d.properties && d.properties.console_identifier === 'Dispatch console'); - }, - isArtemis: function (d) { - return (d.nodeType === 'route-container' || d.nodeType === 'on-demand') && (d.properties && d.properties.product === 'apache-activemq-artemis'); - }, - - isQpid: function (d) { - return (d.nodeType === 'route-container' || d.nodeType === 'on-demand') && (d.properties && d.properties.product === 'qpid-cpp'); - }, - flatten: function (attributes, result) { - if (!attributes || !result) - return {}; - var flat = {}; - attributes.forEach(function(attr, i) { - if (result && result.length > i) - flat[attr] = result[i]; - }); - return flat; - }, - copy: function (obj) { - if (obj) - return JSON.parse(JSON.stringify(obj)); - }, - identity_clean: function (identity) { - if (!identity) - return '-'; - var pos = identity.indexOf('/'); - if (pos >= 0) - return identity.substring(pos + 1); - return identity; - }, - addr_text: function (addr) { - if (!addr) - return '-'; - if (addr[0] == 'M') - return addr.substring(2); - else - return addr.substring(1); - }, - addr_class: function (addr) { - if (!addr) return '-'; - if (addr[0] == 'M') return 'mobile'; - if (addr[0] == 'R') return 'router'; - if (addr[0] == 'A') return 'area'; - if (addr[0] == 'L') return 'local'; - if (addr[0] == 'C') return 'link-incoming'; - if (addr[0] == 'E') return 'link-incoming'; - if (addr[0] == 'D') return 'link-outgoing'; - if (addr[0] == 'F') return 'link-outgoing'; - if (addr[0] == 'T') return 'topo'; - return 'unknown: ' + addr[0]; - }, - humanify: function (s) { - if (!s || s.length === 0) - return s; - var t = s.charAt(0).toUpperCase() + s.substr(1).replace(/[A-Z]/g, ' $&'); - return t.replace('.', ' '); - }, - pretty: function (v) { - var formatComma = ddd.format(','); - if (!isNaN(parseFloat(v)) && isFinite(v)) - return formatComma(v); - return v; - }, - isMSIE: function () { - return (document.documentMode || /Edge/.test(navigator.userAgent)); - }, - valFor: function (aAr, vAr, key) { - var idx = aAr.indexOf(key); - if ((idx > -1) && (idx < vAr.length)) { - return vAr[idx]; - } - return null; - }, - // extract the name of the router from the router id - nameFromId: function (id) { - // the router id looks like 'amqp:/topo/0/routerName/$managemrnt' - var parts = id.split('/'); - // handle cases where the router name contains a / - parts.splice(0, 3); // remove amqp, topo, 0 - parts.pop(); // remove $management - return parts.join('/'); - } - -}; -export { utils }; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/plugin/js/amqp/connection.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/plugin/js/amqp/connection.js b/console/stand-alone/plugin/js/amqp/connection.js new file mode 100644 index 0000000..db21e01 --- /dev/null +++ b/console/stand-alone/plugin/js/amqp/connection.js @@ -0,0 +1,347 @@ +/* + * Copyright 2017 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* global Promise */ + +const rhea = require('rhea'); +//import { on, websocket_connect, removeListener, once, connect } from 'rhea'; +import Correlator from './correlator.js'; + +class ConnectionManager { + constructor(protocol) { + this.sender = undefined; + this.receiver = undefined; + this.connection = undefined; + this.version = undefined; + this.errorText = undefined; + this.protocol = protocol; + this.schema = undefined; + this.connectActions = []; + this.disconnectActions = []; + this.correlator = new Correlator(); + this.on_message = (function (context) { + this.correlator.resolve(context); + }).bind(this); + this.on_disconnected = (function () { + this.errorText = 'Disconnected'; + this.executeDisconnectActions(this.errorText); + }).bind(this); + this.on_connection_open = (function () { + this.executeConnectActions(); + }).bind(this); + } + versionCheck(minVer) { + var verparts = this.version.split('.'); + var minparts = minVer.split('.'); + try { + for (var i = 0; i < minparts.length; ++i) { + if (parseInt(minVer[i] > parseInt(verparts[i]))) + return false; + } + } + catch (e) { + return false; + } + return true; + } + addConnectAction(action) { + if (typeof action === 'function') { + this.delConnectAction(action); + this.connectActions.push(action); + } + } + addDisconnectAction(action) { + if (typeof action === 'function') { + this.delDisconnectAction(action); + this.disconnectActions.push(action); + } + } + delConnectAction(action) { + if (typeof action === 'function') { + var index = this.connectActions.indexOf(action); + if (index >= 0) + this.connectActions.splice(index, 1); + } + } + delDisconnectAction(action) { + if (typeof action === 'function') { + var index = this.disconnectActions.indexOf(action); + if (index >= 0) + this.disconnectActions.splice(index, 1); + } + } + executeConnectActions() { + this.connectActions.forEach(function (action) { + try { + action(); + } + catch (e) { + // in case the page that registered the handler has been unloaded + } + }); + this.connectActions = []; + } + executeDisconnectActions(message) { + this.disconnectActions.forEach(function (action) { + try { + action(message); + } + catch (e) { + // in case the page that registered the handler has been unloaded + } + }); + this.disconnectActions = []; + } + on(eventType, fn) { + if (eventType === 'connected') { + this.addConnectAction(fn); + } + else if (eventType === 'disconnected') { + this.addDisconnectAction(fn); + } + else { + console.log('unknown event type ' + eventType); + } + } + setSchema(schema) { + this.schema = schema; + } + is_connected() { + return this.connection && + this.sender && + this.receiver && + this.receiver.remote && + this.receiver.remote.attach && + this.receiver.remote.attach.source && + this.receiver.remote.attach.source.address && + this.connection.is_open(); + } + disconnect() { + if (this.sender) + this.sender.close(); + if (this.receiver) + this.receiver.close(); + if (this.connection) + this.connection.close(); + } + createSenderReceiver(options) { + return new Promise((function (resolve, reject) { + var timeout = options.timeout || 10000; + // set a timer in case the setup takes too long + var giveUp = (function () { + this.connection.removeListener('receiver_open', receiver_open); + this.connection.removeListener('sendable', sendable); + this.errorText = 'timed out creating senders and receivers'; + reject(Error(this.errorText)); + }).bind(this); + var timer = setTimeout(giveUp, timeout); + // register an event hander for when the setup is complete + var sendable = (function (context) { + clearTimeout(timer); + this.version = this.connection.properties ? this.connection.properties.version : '0.1.0'; + // in case this connection dies + rhea.on('disconnected', this.on_disconnected); + // in case this connection dies and is then reconnected automatically + rhea.on('connection_open', this.on_connection_open); + // receive messages here + this.connection.on('message', this.on_message); + resolve(context); + }).bind(this); + this.connection.once('sendable', sendable); + // Now actually createt the sender and receiver. + // register an event handler for when the receiver opens + var receiver_open = (function () { + // once the receiver is open, create the sender + if (options.sender_address) + this.sender = this.connection.open_sender(options.sender_address); + else + this.sender = this.connection.open_sender(); + }).bind(this); + this.connection.once('receiver_open', receiver_open); + // create a dynamic receiver + this.receiver = this.connection.open_receiver({ source: { dynamic: true } }); + }).bind(this)); + } + connect(options) { + return new Promise((function (resolve, reject) { + var finishConnecting = function () { + this.createSenderReceiver(options) + .then(function (results) { + resolve(results); + }, function (error) { + reject(error); + }); + }; + if (!this.connection) { + options.test = false; // if you didn't want a connection, you should have called testConnect() and not connect() + this.testConnect(options) + .then((function () { + finishConnecting.call(this); + }).bind(this), (function () { + // connect failed or timed out + this.errorText = 'Unable to connect'; + this.executeDisconnectActions(this.errorText); + reject(Error(this.errorText)); + }).bind(this)); + } + else { + finishConnecting.call(this); + } + }).bind(this)); + } + getReceiverAddress() { + return this.receiver.remote.attach.source.address; + } + // Try to connect using the options. + // if options.test === true -> close the connection if it succeeded and resolve the promise + // if the connection attempt fails or times out, reject the promise regardless of options.test + testConnect(options, callback) { + return new Promise((function (resolve, reject) { + var timeout = options.timeout || 10000; + var reconnect = options.reconnect || false; // in case options.reconnect is undefined + var baseAddress = options.address + ':' + options.port; + if (options.linkRouteAddress) { + baseAddress += ('/' + options.linkRouteAddress); + } + var wsprotocol = location.protocol === 'https:' ? 'wss' : 'ws'; + if (this.connection) { + delete this.connection; + this.connection = null; + } + var ws = rhea.websocket_connect(WebSocket); + var c = { + connection_details: new ws(wsprotocol + '://' + baseAddress, ['binary']), + reconnect: reconnect, + properties: options.properties || { console_identifier: 'Dispatch console' } + }; + if (options.hostname) + c.hostname = options.hostname; + if (options.username && options.username !== '') { + c.username = options.username; + } + if (options.password && options.password !== '') { + c.password = options.password; + } + // set a timeout + var disconnected = (function () { + clearTimeout(timer); + rhea.removeListener('disconnected', disconnected); + rhea.removeListener('connection_open', connection_open); + this.connection = null; + var rej = 'failed to connect'; + if (callback) + callback({ error: rej }); + reject(Error(rej)); + }).bind(this); + var timer = setTimeout(disconnected, timeout); + // the event handler for when the connection opens + var connection_open = (function (context) { + clearTimeout(timer); + // prevent future disconnects from calling reject + rhea.removeListener('disconnected', disconnected); + // we were just checking. we don't really want a connection + if (options.test) { + context.connection.close(); + this.connection = null; + } + else + this.on_connection_open(); + var res = { context: context }; + if (callback) + callback(res); + resolve(res); + }).bind(this); + // register an event handler for when the connection opens + rhea.once('connection_open', connection_open); + // register an event handler for if the connection fails to open + rhea.once('disconnected', disconnected); + // attempt the connection + this.connection = rhea.connect(c); + }).bind(this)); + } + sendMgmtQuery(operation) { + return this.send([], '/$management', operation); + } + sendQuery(toAddr, entity, attrs, operation) { + operation = operation || 'QUERY'; + var fullAddr = this._fullAddr(toAddr); + var body = { attributeNames: attrs || [] }; + return this.send(body, fullAddr, operation, this.schema.entityTypes[entity].fullyQualifiedType); + } + send(body, to, operation, entityType) { + var application_properties = { + operation: operation, + type: 'org.amqp.management', + name: 'self' + }; + if (entityType) + application_properties.entityType = entityType; + return this._send(body, to, application_properties); + } + sendMethod(toAddr, entity, attrs, operation, props) { + var fullAddr = this._fullAddr(toAddr); + var application_properties = { + operation: operation, + }; + if (entity) { + application_properties.type = this.schema.entityTypes[entity].fullyQualifiedType; + } + if (attrs.name) + application_properties.name = attrs.name; + else if (attrs.identity) + application_properties.identity = attrs.identity; + if (props) { + for (var attrname in props) { + application_properties[attrname] = props[attrname]; + } + } + return this._send(attrs, fullAddr, application_properties); + } + _send(body, to, application_properties) { + var _correlationId = this.correlator.corr(); + var self = this; + return new Promise(function (resolve, reject) { + self.correlator.register(_correlationId, resolve, reject); + self.sender.send({ + body: body, + to: to, + reply_to: self.receiver.remote.attach.source.address, + correlation_id: _correlationId, + application_properties: application_properties + }); + }); + } + _fullAddr(toAddr) { + var toAddrParts = toAddr.split('/'); + toAddrParts.shift(); + var fullAddr = toAddrParts.join('/'); + return fullAddr; + } + availableQeueuDepth() { + return this.correlator.depth(); + } +} + +class ConnectionException { + constructor(message) { + this.message = message; + this.name = 'ConnectionException'; + } +} + +const _ConnectionManager = ConnectionManager; +export { _ConnectionManager as ConnectionManager }; +const _ConnectionException = ConnectionException; +export { _ConnectionException as ConnectionException }; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/plugin/js/amqp/correlator.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/plugin/js/amqp/correlator.js b/console/stand-alone/plugin/js/amqp/correlator.js new file mode 100644 index 0000000..bf34f93 --- /dev/null +++ b/console/stand-alone/plugin/js/amqp/correlator.js @@ -0,0 +1,50 @@ +/* + * Copyright 2017 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { utils } from './utilities.js'; + +class Correlator { + constructor() { + this._objects = {}; + this._correlationID = 0; + this.maxCorrelatorDepth = 10; + } + corr() { + return ++(this._correlationID) + ''; + } + // Associate this correlation id with the promise's resolve and reject methods + register(id, resolve, reject) { + this._objects[id] = { resolver: resolve, rejector: reject }; + } + // Call the promise's resolve method. + // This is called by rhea's receiver.on('message') function + resolve(context) { + var correlationID = context.message.correlation_id; + // call the promise's resolve function with a copy of the rhea response (so we don't keep any references to internal rhea data) + this._objects[correlationID].resolver({ response: utils.copy(context.message.body), context: context }); + delete this._objects[correlationID]; + } + reject(id, error) { + this._objects[id].rejector(error); + delete this._objects[id]; + } + // Return the number of requests that can be sent before we start queuing requests + depth() { + return Math.max(1, this.maxCorrelatorDepth - Object.keys(this._objects).length); + } +} + +export default Correlator; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/plugin/js/amqp/management.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/plugin/js/amqp/management.js b/console/stand-alone/plugin/js/amqp/management.js new file mode 100644 index 0000000..4b3bb32 --- /dev/null +++ b/console/stand-alone/plugin/js/amqp/management.js @@ -0,0 +1,63 @@ +/* + * Copyright 2015 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* global Promise */ + +import { ConnectionManager } from './connection.js'; +import Topology from './topology.js'; + +export class Management { + constructor(protocol) { + this.connection = new ConnectionManager(protocol); + this.topology = new Topology(this.connection); + } + getSchema(callback) { + var self = this; + return new Promise(function (resolve, reject) { + self.connection.sendMgmtQuery('GET-SCHEMA') + .then(function (responseAndContext) { + var response = responseAndContext.response; + for (var entityName in response.entityTypes) { + var entity = response.entityTypes[entityName]; + if (entity.deprecated) { + // deprecated entity + delete response.entityTypes[entityName]; + } + else { + for (var attributeName in entity.attributes) { + var attribute = entity.attributes[attributeName]; + if (attribute.deprecated) { + // deprecated attribute + delete response.entityTypes[entityName].attributes[attributeName]; + } + } + } + } + self.connection.setSchema(response); + if (callback) + callback(response); + resolve(response); + }, function (error) { + if (callback) + callback(error); + reject(error); + }); + }); + } + schema() { + return this.connection.schema; + } +} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/plugin/js/amqp/topology.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/plugin/js/amqp/topology.js b/console/stand-alone/plugin/js/amqp/topology.js new file mode 100644 index 0000000..e208a6f --- /dev/null +++ b/console/stand-alone/plugin/js/amqp/topology.js @@ -0,0 +1,403 @@ +/* + * Copyright 2015 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* global Promise d3 */ + +import { utils } from './utilities.js'; + +class Topology { + constructor(connectionManager) { + this.connection = connectionManager; + this.updatedActions = {}; + this.entities = []; // which entities to request each topology update + this.entityAttribs = {}; + this._nodeInfo = {}; // info about all known nodes and entities + this.filtering = false; // filter out nodes that don't have connection info + this.timeout = 5000; + this.updateInterval = 5000; + this._getTimer = null; + this.updating = false; + } + addUpdatedAction(key, action) { + if (typeof action === 'function') { + this.updatedActions[key] = action; + } + } + delUpdatedAction(key) { + if (key in this.updatedActions) + delete this.updatedActions[key]; + } + executeUpdatedActions(error) { + for (var action in this.updatedActions) { + this.updatedActions[action].apply(this, [error]); + } + } + setUpdateEntities(entities) { + this.entities = entities; + for (var i = 0; i < entities.length; i++) { + this.entityAttribs[entities[i]] = []; + } + } + addUpdateEntities(entityAttribs) { + if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { + entityAttribs = [entityAttribs]; + } + for (var i = 0; i < entityAttribs.length; i++) { + var entity = entityAttribs[i].entity; + this.entityAttribs[entity] = entityAttribs[i].attrs || []; + } + } + on(eventName, fn, key) { + if (eventName === 'updated') + this.addUpdatedAction(key, fn); + } + unregister(eventName, key) { + if (eventName === 'updated') + this.delUpdatedAction(key); + } + nodeInfo() { + return this._nodeInfo; + } + get() { + return new Promise((function (resolve, reject) { + this.connection.sendMgmtQuery('GET-MGMT-NODES') + .then((function (response) { + response = response.response; + if (Object.prototype.toString.call(response) === '[object Array]') { + var workInfo = {}; + // if there is only one node, it will not be returned + if (response.length === 0) { + var parts = this.connection.getReceiverAddress().split('/'); + parts[parts.length - 1] = '$management'; + response.push(parts.join('/')); + } + for (var i = 0; i < response.length; ++i) { + workInfo[response[i]] = {}; + } + var gotResponse = function (nodeName, entity, response) { + workInfo[nodeName][entity] = response; + }; + var q = d3.queue(this.connection.availableQeueuDepth()); + for (var id in workInfo) { + for (var entity in this.entityAttribs) { + q.defer((this.q_fetchNodeInfo).bind(this), id, entity, this.entityAttribs[entity], q, gotResponse); + } + } + q.await((function () { + // filter out nodes that have no connection info + if (this.filtering) { + for (var id in workInfo) { + if (!(workInfo[id].connection)) { + this.flux = true; + delete workInfo[id]; + } + } + } + this._nodeInfo = utils.copy(workInfo); + this.onDone(this._nodeInfo); + resolve(this._nodeInfo); + }).bind(this)); + } + }).bind(this), function (error) { + reject(error); + }); + }).bind(this)); + } + onDone(result) { + clearTimeout(this._getTimer); + if (this.updating) + this._getTimer = setTimeout((this.get).bind(this), this.updateInterval); + this.executeUpdatedActions(result); + } + startUpdating(filter) { + this.stopUpdating(); + this.updating = true; + this.filtering = filter; + this.get(); + } + stopUpdating() { + this.updating = false; + if (this._getTimer) { + clearTimeout(this._getTimer); + this._getTimer = null; + } + } + fetchEntity(node, entity, attrs, callback) { + var results = {}; + var gotResponse = function (nodeName, dotentity, response) { + results = response; + }; + var q = d3.queue(this.connection.availableQeueuDepth()); + q.defer((this.q_fetchNodeInfo).bind(this), node, entity, attrs, q, gotResponse); + q.await(function () { + callback(node, entity, results); + }); + } + // called from d3.queue.defer so the last argument (callback) is supplied by d3 + q_fetchNodeInfo(nodeId, entity, attrs, q, heartbeat, callback) { + this.getNodeInfo(nodeId, entity, attrs, q, function (nodeName, dotentity, response) { + heartbeat(nodeName, dotentity, response); + callback(null); + }); + } + // get all the requested entities/attributes for a single router + fetchEntities(node, entityAttribs, doneCallback, resultCallback) { + var q = d3.queue(this.connection.availableQeueuDepth()); + var results = {}; + if (!resultCallback) { + resultCallback = function (nodeName, dotentity, response) { + if (!results[nodeName]) + results[nodeName] = {}; + results[nodeName][dotentity] = response; + }; + } + var gotAResponse = function (nodeName, dotentity, response) { + resultCallback(nodeName, dotentity, response); + }; + if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { + entityAttribs = [entityAttribs]; + } + for (var i = 0; i < entityAttribs.length; ++i) { + var ea = entityAttribs[i]; + q.defer((this.q_fetchNodeInfo).bind(this), node, ea.entity, ea.attrs || [], q, gotAResponse); + } + q.await(function () { + doneCallback(results); + }); + } + // get all the requested entities for all known routers + fetchAllEntities(entityAttribs, doneCallback, resultCallback) { + var q = d3.queue(this.connection.availableQeueuDepth()); + var results = {}; + if (!resultCallback) { + resultCallback = function (nodeName, dotentity, response) { + if (!results[nodeName]) + results[nodeName] = {}; + results[nodeName][dotentity] = response; + }; + } + var gotAResponse = function (nodeName, dotentity, response) { + resultCallback(nodeName, dotentity, response); + }; + if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { + entityAttribs = [entityAttribs]; + } + var nodes = Object.keys(this._nodeInfo); + for (var n = 0; n < nodes.length; ++n) { + for (var i = 0; i < entityAttribs.length; ++i) { + var ea = entityAttribs[i]; + q.defer((this.q_fetchNodeInfo).bind(this), nodes[n], ea.entity, ea.attrs || [], q, gotAResponse); + } + } + q.await(function () { + doneCallback(results); + }); + } + // enusre all the topology nones have all these entities + ensureAllEntities(entityAttribs, callback, extra) { + this.ensureEntities(Object.keys(this._nodeInfo), entityAttribs, callback, extra); + } + // ensure these nodes have all these entities. don't fetch unless forced to + ensureEntities(nodes, entityAttribs, callback, extra) { + if (Object.prototype.toString.call(entityAttribs) !== '[object Array]') { + entityAttribs = [entityAttribs]; + } + if (Object.prototype.toString.call(nodes) !== '[object Array]') { + nodes = [nodes]; + } + this.addUpdateEntities(entityAttribs); + var q = d3.queue(this.connection.availableQeueuDepth()); + for (var n = 0; n < nodes.length; ++n) { + for (var i = 0; i < entityAttribs.length; ++i) { + var ea = entityAttribs[i]; + // if we don'e already have the entity or we want to force a refresh + if (!this._nodeInfo[nodes[n]][ea.entity] || ea.force) + q.defer((this.q_ensureNodeInfo).bind(this), nodes[n], ea.entity, ea.attrs || [], q); + } + } + q.await(function () { + callback(extra); + }); + } + addNodeInfo(id, entity, values) { + // save the results in the nodeInfo object + if (id) { + if (!(id in this._nodeInfo)) { + this._nodeInfo[id] = {}; + } + // copy the values to allow garbage collection + this._nodeInfo[id][entity] = values; + } + } + isLargeNetwork() { + return Object.keys(this._nodeInfo).length >= 12; + } + getConnForLink(link) { + // find the connection for this link + var conns = this._nodeInfo[link.nodeId].connection; + var connIndex = conns.attributeNames.indexOf('identity'); + var linkCons = conns.results.filter(function (conn) { + return conn[connIndex] === link.connectionId; + }); + return utils.flatten(conns.attributeNames, linkCons[0]); + } + nodeNameList() { + var nl = []; + for (var id in this._nodeInfo) { + nl.push(utils.nameFromId(id)); + } + return nl.sort(); + } + nodeIdList() { + var nl = []; + for (var id in this._nodeInfo) { + //if (this._nodeInfo['connection']) + nl.push(id); + } + return nl.sort(); + } + nodeList() { + var nl = []; + for (var id in this._nodeInfo) { + nl.push({ + name: utils.nameFromId(id), + id: id + }); + } + return nl; + } + // d3.queue'd function to make a management query for entities/attributes + q_ensureNodeInfo(nodeId, entity, attrs, q, callback) { + this.getNodeInfo(nodeId, entity, attrs, q, (function (nodeName, dotentity, response) { + this.addNodeInfo(nodeName, dotentity, response); + callback(null); + }).bind(this)); + return { + abort: function () { + delete this._nodeInfo[nodeId]; + } + }; + } + getNodeInfo(nodeName, entity, attrs, q, callback) { + var timedOut = function (q) { + q.abort(); + }; + var atimer = setTimeout(timedOut, this.timeout, q); + this.connection.sendQuery(nodeName, entity, attrs) + .then(function (response) { + clearTimeout(atimer); + callback(nodeName, entity, response.response); + }, function () { + q.abort(); + }); + } + getMultipleNodeInfo(nodeNames, entity, attrs, callback, selectedNodeId, aggregate) { + var self = this; + if (typeof aggregate === 'undefined') + aggregate = true; + var responses = {}; + var gotNodesResult = function (nodeName, dotentity, response) { + responses[nodeName] = response; + }; + var q = d3.queue(this.connection.availableQeueuDepth()); + nodeNames.forEach(function (id) { + q.defer((self.q_fetchNodeInfo).bind(self), id, entity, attrs, q, gotNodesResult); + }); + q.await(function () { + if (aggregate) + self.aggregateNodeInfo(nodeNames, entity, selectedNodeId, responses, callback); + else { + callback(nodeNames, entity, responses); + } + }); + } + quiesceLink(nodeId, name) { + var attributes = { + adminStatus: 'disabled', + name: name + }; + return this.connection.sendMethod(nodeId, 'router.link', attributes, 'UPDATE'); + } + aggregateNodeInfo(nodeNames, entity, selectedNodeId, responses, callback) { + // aggregate the responses + var self = this; + var newResponse = {}; + var thisNode = responses[selectedNodeId]; + newResponse.attributeNames = thisNode.attributeNames; + newResponse.results = thisNode.results; + newResponse.aggregates = []; + // initialize the aggregates + for (var i = 0; i < thisNode.results.length; ++i) { + // there is a result for each unique entity found (ie addresses, links, etc.) + var result = thisNode.results[i]; + var vals = []; + // there is a val for each attribute in this entity + result.forEach(function (val) { + vals.push({ + sum: val, + detail: [] + }); + }); + newResponse.aggregates.push(vals); + } + var nameIndex = thisNode.attributeNames.indexOf('name'); + var ent = self.connection.schema.entityTypes[entity]; + var ids = Object.keys(responses); + ids.sort(); + ids.forEach(function (id) { + var response = responses[id]; + var results = response.results; + results.forEach(function (result) { + // find the matching result in the aggregates + var found = newResponse.aggregates.some(function (aggregate) { + if (aggregate[nameIndex].sum === result[nameIndex]) { + // result and aggregate are now the same record, add the graphable values + newResponse.attributeNames.forEach(function (key, i) { + if (ent.attributes[key] && ent.attributes[key].graph) { + if (id != selectedNodeId) + aggregate[i].sum += result[i]; + } + aggregate[i].detail.push({ + node: utils.nameFromId(id) + ':', + val: result[i] + }); + }); + return true; // stop looping + } + return false; // continute looking for the aggregate record + }); + if (!found) { + // this attribute was not found in the aggregates yet + // because it was not in the selectedNodeId's results + var vals = []; + result.forEach(function (val) { + vals.push({ + sum: val, + detail: [{ + node: utils.nameFromId(id), + val: val + }] + }); + }); + newResponse.aggregates.push(vals); + } + }); + }); + callback(nodeNames, entity, newResponse); + } +} + +export default Topology; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/plugin/js/amqp/utilities.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/plugin/js/amqp/utilities.js b/console/stand-alone/plugin/js/amqp/utilities.js new file mode 100644 index 0000000..328da38 --- /dev/null +++ b/console/stand-alone/plugin/js/amqp/utilities.js @@ -0,0 +1,115 @@ +/* + * Copyright 2015 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* global d3 */ +var ddd = typeof window === 'undefined' ? require ('d3') : d3; + +var utils = { + isAConsole: function (properties, connectionId, nodeType, key) { + return this.isConsole({ + properties: properties, + connectionId: connectionId, + nodeType: nodeType, + key: key + }); + }, + isConsole: function (d) { + return (d && d.properties && d.properties.console_identifier === 'Dispatch console'); + }, + isArtemis: function (d) { + return (d.nodeType === 'route-container' || d.nodeType === 'on-demand') && (d.properties && d.properties.product === 'apache-activemq-artemis'); + }, + + isQpid: function (d) { + return (d.nodeType === 'route-container' || d.nodeType === 'on-demand') && (d.properties && d.properties.product === 'qpid-cpp'); + }, + flatten: function (attributes, result) { + if (!attributes || !result) + return {}; + var flat = {}; + attributes.forEach(function(attr, i) { + if (result && result.length > i) + flat[attr] = result[i]; + }); + return flat; + }, + copy: function (obj) { + if (obj) + return JSON.parse(JSON.stringify(obj)); + }, + identity_clean: function (identity) { + if (!identity) + return '-'; + var pos = identity.indexOf('/'); + if (pos >= 0) + return identity.substring(pos + 1); + return identity; + }, + addr_text: function (addr) { + if (!addr) + return '-'; + if (addr[0] == 'M') + return addr.substring(2); + else + return addr.substring(1); + }, + addr_class: function (addr) { + if (!addr) return '-'; + if (addr[0] == 'M') return 'mobile'; + if (addr[0] == 'R') return 'router'; + if (addr[0] == 'A') return 'area'; + if (addr[0] == 'L') return 'local'; + if (addr[0] == 'C') return 'link-incoming'; + if (addr[0] == 'E') return 'link-incoming'; + if (addr[0] == 'D') return 'link-outgoing'; + if (addr[0] == 'F') return 'link-outgoing'; + if (addr[0] == 'T') return 'topo'; + return 'unknown: ' + addr[0]; + }, + humanify: function (s) { + if (!s || s.length === 0) + return s; + var t = s.charAt(0).toUpperCase() + s.substr(1).replace(/[A-Z]/g, ' $&'); + return t.replace('.', ' '); + }, + pretty: function (v) { + var formatComma = ddd.format(','); + if (!isNaN(parseFloat(v)) && isFinite(v)) + return formatComma(v); + return v; + }, + isMSIE: function () { + return (document.documentMode || /Edge/.test(navigator.userAgent)); + }, + valFor: function (aAr, vAr, key) { + var idx = aAr.indexOf(key); + if ((idx > -1) && (idx < vAr.length)) { + return vAr[idx]; + } + return null; + }, + // extract the name of the router from the router id + nameFromId: function (id) { + // the router id looks like 'amqp:/topo/0/routerName/$managemrnt' + var parts = id.split('/'); + // handle cases where the router name contains a / + parts.splice(0, 3); // remove amqp, topo, 0 + parts.pop(); // remove $management + return parts.join('/'); + } + +}; +export { utils }; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/console/stand-alone/plugin/js/qdrService.js ---------------------------------------------------------------------- diff --git a/console/stand-alone/plugin/js/qdrService.js b/console/stand-alone/plugin/js/qdrService.js index 8f13192..555b966 100644 --- a/console/stand-alone/plugin/js/qdrService.js +++ b/console/stand-alone/plugin/js/qdrService.js @@ -17,8 +17,8 @@ Licensed to the Apache Software Foundation (ASF) under one under the License. */ /* global Promise */ -import { Management as dm } from '../../modules/management.js'; -import { utils } from '../../modules/utilities.js'; +import { Management as dm } from './amqp/management.js'; +import { utils } from './amqp/utilities.js'; import { QDR_LAST_LOCATION, QDRLogger} from './qdrGlobals.js'; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f543ba2b/tests/system_tests_console.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_console.py b/tests/system_tests_console.py index eb0fb1c..b0cce10 100644 --- a/tests/system_tests_console.py +++ b/tests/system_tests_console.py @@ -29,10 +29,8 @@ import unittest from subprocess import PIPE import subprocess import shutil -from proton import Url, SSLDomain, SSLUnavailable, SASL from system_test import main_module - class ConsoleTest(system_test.TestCase): """Run console tests""" @@ -48,35 +46,35 @@ class ConsoleTest(system_test.TestCase): def run_console_test(self): # expecting <base-path>/build/system_test.dir/system_tests_console/ConsoleTest/test_console - # /foo/qpid-dispatch/build/system_test.dir/system_tests_console/ConsoleTest/test_console/ - # run_console_test.out cwd = os.getcwd() - def get_base(remove): - l_base = cwd.split('/')[:-remove] # path that ends with qpid-dispatch's home dir + def get_dirs(remove): + # l_base is the path that ends with qpid-dispatch's home dir + l_base = cwd.split('/')[:-remove] l_test_cmd = '/'.join(l_base + ['build', 'console', 'node_modules', '.bin', 'mocha']) l_test_dir = '/'.join(l_base + ['console', 'stand-alone', 'test']) l_src_dir = '/'.join(l_base + ['console', 'stand-alone']) - return l_base, l_test_cmd, l_test_dir, l_src_dir + l_node_dir = '/'.join(l_base + ['console', 'stand-alone', 'node_modules']) + return l_test_cmd, l_test_dir, l_src_dir, l_node_dir - (base, test_cmd, test_dir, src_dir) = get_base(6) + (test_cmd, test_dir, src_dir, node_dir) = get_dirs(6) found_src = os.path.isdir(src_dir) # running the test from the command line results in a different path if not found_src: - (base, test_cmd, test_dir, src_dir) = get_base(5) + (test_cmd, test_dir, src_dir, node_dir) = get_dirs(5) found_src = os.path.isdir(src_dir) pret = 0 + # If we are unable to find the console's source directory. Skip the test out = 'Skipped' - if found_src: # if we are unable to find the console's source directory. Skip the test - # The console test needs a node_modules dir in the source directory - # If the node_modules dir is not present in the source dir, create it. - # An alternative is to copy all the source files to the build/console dir. - node_dir = '/'.join(base + ['console', 'stand-alone', 'node_modules']) + if found_src: + ''' The console test needs a node_modules dir in the source directory. + If the node_modules dir is not present in the source dir, create it. + An alternative is to copy all the source files to the build/console dir. ''' node_modules = os.path.isdir(node_dir) if not node_modules: p0 = subprocess.Popen(['npm', 'install', '--loglevel=error'], stdout=PIPE, cwd=src_dir) - p0.wait(); + p0.wait() prg = [test_cmd,'--require', 'babel-core/register', test_dir, '--http_port=%s' % self.http_port, '--src=%s/' % src_dir] p = self.popen(prg, stdout=PIPE, expect=None) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
