Repository: ambari Updated Branches: refs/heads/branch-3.0-perf d986503ca -> 11f16c83a
AMBARI-21283 Integrate cluster-env configs update with websocket events. (atkach) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11f16c83 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11f16c83 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11f16c83 Branch: refs/heads/branch-3.0-perf Commit: 11f16c83ac8f2ebbf4609240a895a4a3d078b946 Parents: d986503 Author: Andrii Tkach <[email protected]> Authored: Tue Jun 20 13:08:27 2017 +0300 Committer: Andrii Tkach <[email protected]> Committed: Tue Jun 20 13:08:27 2017 +0300 ---------------------------------------------------------------------- .../app/controllers/global/update_controller.js | 10 ++- .../main/dashboard/config_history_controller.js | 4 +- ambari-web/app/utils/stomp_client.js | 71 ++++++++++++++++--- .../global/update_controller_test.js | 18 ++++- .../dashboard/config_history_controller_test.js | 12 ++-- ambari-web/test/controllers/main_test.js | 6 ++ ambari-web/test/utils/stomp_client_test.js | 73 +++++++++++++++++--- 7 files changed, 163 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/11f16c83/ambari-web/app/controllers/global/update_controller.js ---------------------------------------------------------------------- diff --git a/ambari-web/app/controllers/global/update_controller.js b/ambari-web/app/controllers/global/update_controller.js index d9c882f..5063db3 100644 --- a/ambari-web/app/controllers/global/update_controller.js +++ b/ambari-web/app/controllers/global/update_controller.js @@ -188,10 +188,10 @@ App.UpdateController = Em.Controller.extend({ updateAll: function () { var socket = App.socketEventsMapper; if (this.get('isWorking') && !App.get('isOnlyViewUser')) { - //TODO limit updates by location App.StompClient.subscribe('/events/hostcomponents', socket.applyHostComponentStatusEvents.bind(socket)); App.StompClient.subscribe('/events/alerts', socket.applyAlertDefinitionSummaryEvents.bind(socket)); App.StompClient.subscribe('/events/topologies', App.topologyMapper.map.bind(App.topologyMapper)); + App.StompClient.subscribe('/events/configs', this.makeCallForClusterEnv.bind(this)); App.updater.run(this, 'updateServices', 'isWorking'); App.updater.run(this, 'updateHost', 'isWorking'); @@ -205,13 +205,13 @@ App.UpdateController = Em.Controller.extend({ if (!App.get('router.mainAlertInstancesController.isUpdating')) { App.updater.run(this, 'updateUnhealthyAlertInstances', 'updateAlertInstances', App.alertInstancesUpdateInterval, '\/main\/alerts.*'); } - App.updater.run(this, 'updateClusterEnv', 'isWorking', App.clusterEnvUpdateInterval); App.updater.run(this, 'updateUpgradeState', 'isWorking', App.bgOperationsUpdateInterval); App.updater.run(this, 'updateWizardWatcher', 'isWorking', App.bgOperationsUpdateInterval); } else { App.StompClient.unsubscribe('/events/hostcomponents'); App.StompClient.unsubscribe('/events/alerts'); App.StompClient.unsubscribe('/events/topologies'); + App.StompClient.unsubscribe('/events/configs'); } }.observes('isWorking', 'App.router.mainAlertInstancesController.isUpdating'), @@ -621,6 +621,12 @@ App.UpdateController = Em.Controller.extend({ } }, + makeCallForClusterEnv: function(event) { + if (event.configs.someProperty('type', 'cluster-env')) { + this.updateClusterEnv(); + } + }, + //TODO - update service auto-start to use this updateClusterEnv: function (callback) { this.loadClusterConfig(callback).done(function (data) { http://git-wip-us.apache.org/repos/asf/ambari/blob/11f16c83/ambari-web/app/controllers/main/dashboard/config_history_controller.js ---------------------------------------------------------------------- diff --git a/ambari-web/app/controllers/main/dashboard/config_history_controller.js b/ambari-web/app/controllers/main/dashboard/config_history_controller.js index 197a88b..70e608c 100644 --- a/ambari-web/app/controllers/main/dashboard/config_history_controller.js +++ b/ambari-web/app/controllers/main/dashboard/config_history_controller.js @@ -233,11 +233,11 @@ App.MainConfigHistoryController = Em.ArrayController.extend(App.TableServerMixin }, subscribeToUpdates: function() { - App.StompClient.subscribe('/events/configs', this.load.bind(this)); + App.StompClient.addHandler('/events/configs', 'history', this.load.bind(this)); }, unsubscribeOfUpdates: function() { - App.StompClient.unsubscribe('/events/configs'); + App.StompClient.removeHandler('/events/configs', 'history'); }, /** http://git-wip-us.apache.org/repos/asf/ambari/blob/11f16c83/ambari-web/app/utils/stomp_client.js ---------------------------------------------------------------------- diff --git a/ambari-web/app/utils/stomp_client.js b/ambari-web/app/utils/stomp_client.js index 1e63164..11a62b7 100644 --- a/ambari-web/app/utils/stomp_client.js +++ b/ambari-web/app/utils/stomp_client.js @@ -25,12 +25,13 @@ module.exports = Em.Object.extend({ client: null, /** - * TODO set actual url - * @type string + * @type {string} */ webSocketUrl: 'ws://{hostname}:8080/api/stomp/v1', - //TODO set actual url + /** + * @type {string} + */ sockJsUrl: 'http://{hostname}:8080/api/stomp/v1', /** @@ -43,6 +44,10 @@ module.exports = Em.Object.extend({ */ isWebSocketSupported: true, + /** + * @type {number} + * @const + */ RECONNECT_TIMEOUT: 6000, /** @@ -56,6 +61,11 @@ module.exports = Em.Object.extend({ */ headers: {}, + /** + * + * @param {boolean} useSockJS + * @returns {$.Deferred} + */ connect: function(useSockJS) { const dfd = $.Deferred(); const socket = this.getSocket(useSockJS); @@ -98,8 +108,7 @@ module.exports = Em.Object.extend({ this.reconnect(); } else { //if webSocket failed on initial connect then switch to SockJS - //TODO enable when SockJS API provided - //this.connect(true); + this.connect(true); } }, @@ -110,7 +119,10 @@ module.exports = Em.Object.extend({ this.connect().done(() => { for (var i in subscriptions) { subscriptions[i].unsubscribe(); - this.subscribe(subscriptions[i].destination, subscriptions[i].callback); + this.subscribe(subscriptions[i].destination, subscriptions[i].handlers['default']); + for (var key in subscriptions[i].handlers) { + key !== 'default' && this.addHandler(subscriptions[i].destination, key, subscriptions[i].handlers[key]); + } } }); }, this.RECONNECT_TIMEOUT); @@ -137,23 +149,60 @@ module.exports = Em.Object.extend({ /** * * @param destination - * @param callback + * @param {function} handler * @returns {*} */ - subscribe: function(destination, callback = Em.K) { - if (!this.get('client.connected')) { + subscribe: function(destination, handler = Em.K) { + const handlers = { + default: handler + }; + if (!this.get('client.connected') || this.get('subscriptions')[destination]) { return null; } const subscription = this.get('client').subscribe(destination, (message) => { - callback(JSON.parse(message.body)); + for (var i in handlers) { + handlers[i](JSON.parse(message.body)); + } }); subscription.destination = destination; - subscription.callback = callback; + subscription.handlers = handlers; this.get('subscriptions')[destination] = subscription; return subscription; }, /** + * If trying to add handler to not existing subscription then it will be created and handler added as default + * @param {string} destination + * @param {string} key + * @param {function} handler + */ + addHandler: function(destination, key, handler) { + const subscription = this.get('subscriptions')[destination]; + if (!subscription) { + this.subscribe(destination); + return this.addHandler(destination, key, handler); + } + if (subscription.handlers[key]) { + console.error('You can\'t override subscription handler'); + return; + } + subscription.handlers[key] = handler; + }, + + /** + * If removed handler is last and subscription have zero handlers then topic will be unsubscribed + * @param {string} destination + * @param {string} key + */ + removeHandler: function(destination, key) { + const subscription = this.get('subscriptions')[destination]; + delete subscription.handlers[key]; + if (Em.keys(subscription.handlers).length === 0) { + this.unsubscribe(destination); + } + }, + + /** * * @param {string} destination * @returns {boolean} http://git-wip-us.apache.org/repos/asf/ambari/blob/11f16c83/ambari-web/test/controllers/global/update_controller_test.js ---------------------------------------------------------------------- diff --git a/ambari-web/test/controllers/global/update_controller_test.js b/ambari-web/test/controllers/global/update_controller_test.js index 0725944..efacf3a 100644 --- a/ambari-web/test/controllers/global/update_controller_test.js +++ b/ambari-web/test/controllers/global/update_controller_test.js @@ -72,14 +72,16 @@ describe('App.UpdateController', function () { expect(App.StompClient.unsubscribe.calledWith('/events/hostcomponents')).to.be.true; expect(App.StompClient.unsubscribe.calledWith('/events/alerts')).to.be.true; expect(App.StompClient.unsubscribe.calledWith('/events/topologies')).to.be.true; + expect(App.StompClient.unsubscribe.calledWith('/events/configs')).to.be.true; }); it('isWorking = true', function () { controller.set('isWorking', true); - expect(App.updater.run.callCount).to.equal(12); + expect(App.updater.run.callCount).to.equal(11); expect(App.StompClient.subscribe.calledWith('/events/hostcomponents')).to.be.true; expect(App.StompClient.subscribe.calledWith('/events/alerts')).to.be.true; expect(App.StompClient.subscribe.calledWith('/events/topologies')).to.be.true; + expect(App.StompClient.subscribe.calledWith('/events/configs')).to.be.true; }); }); @@ -676,4 +678,18 @@ describe('App.UpdateController', function () { expect(args).to.exists; }); }); + + describe('#makeCallForClusterEnv', function() { + beforeEach(function() { + sinon.stub(c, 'updateClusterEnv'); + }); + afterEach(function() { + c.updateClusterEnv.restore(); + }); + + it('updateClusterEnv should be called', function() { + c.makeCallForClusterEnv({configs: [{type: 'cluster-env'}]}); + expect(c.updateClusterEnv.calledOnce).to.be.true; + }); + }); }); http://git-wip-us.apache.org/repos/asf/ambari/blob/11f16c83/ambari-web/test/controllers/main/dashboard/config_history_controller_test.js ---------------------------------------------------------------------- diff --git a/ambari-web/test/controllers/main/dashboard/config_history_controller_test.js b/ambari-web/test/controllers/main/dashboard/config_history_controller_test.js index 771fd16..b2b0442 100644 --- a/ambari-web/test/controllers/main/dashboard/config_history_controller_test.js +++ b/ambari-web/test/controllers/main/dashboard/config_history_controller_test.js @@ -133,29 +133,29 @@ describe('MainConfigHistoryController', function () { describe('#subscribeToUpdates', function() { beforeEach(function() { - sinon.stub(App.StompClient, 'subscribe'); + sinon.stub(App.StompClient, 'addHandler'); }); afterEach(function() { - App.StompClient.subscribe.restore(); + App.StompClient.addHandler.restore(); }); it('App.StompClient.subscribe should be called', function() { controller.subscribeToUpdates(); - expect(App.StompClient.subscribe.calledWith('/events/configs')).to.be.true; + expect(App.StompClient.addHandler.calledWith('/events/configs', 'history')).to.be.true; }); }); describe('#unsubscribeOfUpdates', function() { beforeEach(function() { - sinon.stub(App.StompClient, 'unsubscribe'); + sinon.stub(App.StompClient, 'removeHandler'); }); afterEach(function() { - App.StompClient.unsubscribe.restore(); + App.StompClient.removeHandler.restore(); }); it('App.StompClient.subscribe should be called', function() { controller.unsubscribeOfUpdates(); - expect(App.StompClient.unsubscribe.calledWith('/events/configs')).to.be.true; + expect(App.StompClient.removeHandler.calledWith('/events/configs', 'history')).to.be.true; }); }); }); http://git-wip-us.apache.org/repos/asf/ambari/blob/11f16c83/ambari-web/test/controllers/main_test.js ---------------------------------------------------------------------- diff --git a/ambari-web/test/controllers/main_test.js b/ambari-web/test/controllers/main_test.js index 573b1e9..4ab423e 100644 --- a/ambari-web/test/controllers/main_test.js +++ b/ambari-web/test/controllers/main_test.js @@ -33,14 +33,20 @@ describe('App.MainController', function () { initialize = true; } }); + sinon.stub(App.StompClient, 'connect'); }); afterEach(function () { App.router.get.restore(); + App.StompClient.connect.restore(); }); it ('Should return true', function() { mainController.initialize(); expect(initialize).to.be.true; }); + it ('App.StompClient.connect should be called', function() { + mainController.initialize(); + expect(App.StompClient.connect.calledOnce).to.be.true; + }); }); describe('#dataLoading', function() { http://git-wip-us.apache.org/repos/asf/ambari/blob/11f16c83/ambari-web/test/utils/stomp_client_test.js ---------------------------------------------------------------------- diff --git a/ambari-web/test/utils/stomp_client_test.js b/ambari-web/test/utils/stomp_client_test.js index a85e6cc..5ee7a5e 100644 --- a/ambari-web/test/utils/stomp_client_test.js +++ b/ambari-web/test/utils/stomp_client_test.js @@ -20,10 +20,14 @@ var App = require('app'); var stompClientClass = require('utils/stomp_client'); describe('App.StompClient', function () { - var stomp; + var stomp, mockStomp; beforeEach(function() { stomp = stompClientClass.create(); + mockStomp = sinon.stub(Stomp, 'over').returns({connect: Em.K}); + }); + afterEach(function() { + Stomp.over.restore(); }); describe('#connect', function() { @@ -31,31 +35,28 @@ describe('App.StompClient', function () { sinon.stub(stomp, 'onConnectionSuccess'); sinon.stub(stomp, 'onConnectionError'); sinon.stub(stomp, 'getSocket'); - this.mockStomp = sinon.stub(Stomp, 'over'); }); afterEach(function() { stomp.onConnectionSuccess.restore(); stomp.onConnectionError.restore(); stomp.getSocket.restore(); - this.mockStomp.restore(); }); it('onConnectionSuccess should be called', function() { - this.mockStomp.returns({connect: function(headers, success, error) { + mockStomp.returns({connect: function(headers, success, error) { success(); }}); stomp.connect(); expect(stomp.onConnectionSuccess.calledOnce).to.be.true; }); it('onConnectionError should be called', function() { - this.mockStomp.returns({connect: function(headers, success, error) { + mockStomp.returns({connect: function(headers, success, error) { error(); }}); stomp.connect(); expect(stomp.onConnectionError.calledOnce).to.be.true; }); it('should set client', function() { - this.mockStomp.returns({connect: Em.K}); stomp.connect(); expect(stomp.get('client')).to.be.eql({ connect: Em.K, @@ -83,9 +84,11 @@ describe('App.StompClient', function () { describe('#onConnectionError', function() { beforeEach(function() { sinon.stub(stomp, 'reconnect'); + sinon.stub(stomp, 'connect'); }); afterEach(function() { stomp.reconnect.restore(); + stomp.connect.restore(); }); it('reconnect should be called when isConnected true', function() { @@ -93,6 +96,12 @@ describe('App.StompClient', function () { stomp.onConnectionError(); expect(stomp.reconnect.calledOnce).to.be.true; }); + + it('connect should be called when isConnected false', function() { + stomp.set('isConnected', false); + stomp.onConnectionError(); + expect(stomp.connect.calledOnce).to.be.true; + }); }); describe('#reconnect', function() { @@ -113,7 +122,7 @@ describe('App.StompClient', function () { var subscriptions = { 'foo': { destination: 'foo', - callback: Em.K, + handlers: { default: Em.K }, unsubscribe: sinon.spy() } }; @@ -166,18 +175,64 @@ describe('App.StompClient', function () { }; stomp.set('client', client); expect(stomp.subscribe('foo')).to.be.eql({ - callback: Em.K, + handlers: { default: Em.K }, destination: 'foo', id: 1 }); expect(stomp.get('subscriptions')['foo']).to.be.eql({ - callback: Em.K, + handlers: { default: Em.K }, destination: 'foo', id: 1 }); }); }); + describe('#addHandler', function() { + beforeEach(function() { + sinon.stub(stomp, 'subscribe', function(dest) { + stomp.get('subscriptions')[dest] = { + handlers: {} + }; + }); + }); + afterEach(function() { + stomp.subscribe.restore(); + }); + + it('should add handler and subscribe because there is no subscription', function() { + stomp.addHandler('dest1', 'handler1', Em.K); + expect(stomp.subscribe.calledWith('dest1')).to.be.true; + expect(stomp.get('subscriptions')['dest1'].handlers).to.be.eql({handler1: Em.K}); + }); + it('should add handler', function() { + stomp.get('subscriptions')['dest2'] = { + handlers: {} + }; + stomp.addHandler('dest2', 'handler2', Em.K); + expect(stomp.get('subscriptions')['dest2'].handlers).to.be.eql({handler2: Em.K}); + }); + }); + + describe('#removeHandler', function() { + beforeEach(function() { + sinon.stub(stomp, 'unsubscribe'); + }); + afterEach(function() { + stomp.unsubscribe.restore(); + }); + + it('should remove handler', function() { + stomp.get('subscriptions')['dest1'] = { + handlers: { + handler1: Em.K + } + }; + stomp.removeHandler('dest1', 'handler1'); + expect(stomp.get('subscriptions')['dest1'].handlers).to.be.empty; + expect(stomp.unsubscribe.calledOnce).to.be.true; + }); + }); + describe('#unsubscribe', function() { it('should not unsubscribe when no subscription found', function() { stomp.set('subscriptions', {});
