IGNITE-6570 Web Console: Move parsing of JSON to pool of workers.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74f04001 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74f04001 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74f04001 Branch: refs/heads/ignite-3478 Commit: 74f04001a985211c499ee4bbd73de686288684a8 Parents: a38fdec Author: Alexey Kuznetsov <[email protected]> Authored: Fri Oct 6 17:00:39 2017 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Fri Oct 6 17:00:39 2017 +0700 ---------------------------------------------------------------------- modules/web-console/backend/app/agentSocket.js | 21 +--- .../web-console/backend/app/browsersHandler.js | 9 +- .../app/modules/agent/AgentManager.service.js | 18 ++- .../app/modules/agent/decompress.worker.js | 33 +++++ .../frontend/app/utils/SimpleWorkerPool.js | 119 +++++++++++++++++++ modules/web-console/frontend/package.json | 1 + 6 files changed, 176 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/backend/app/agentSocket.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js index 489d145..75dcd53 100644 --- a/modules/web-console/backend/app/agentSocket.js +++ b/modules/web-console/backend/app/agentSocket.js @@ -24,7 +24,7 @@ */ module.exports = { implements: 'agent-socket', - inject: ['require(lodash)', 'require(zlib)'] + inject: ['require(lodash)'] }; /** @@ -79,10 +79,9 @@ class Command { /** * @param _ - * @param zlib * @returns {AgentSocket} */ -module.exports.factory = function(_, zlib) { +module.exports.factory = function(_) { /** * Connected agent descriptor. */ @@ -136,21 +135,7 @@ module.exports.factory = function(_, zlib) { if (resErr) return reject(resErr); - if (res.zipped) { - // TODO IGNITE-6127 Temporary solution until GZip support for socket.io-client-java. - // See: https://github.com/socketio/socket.io-client-java/issues/312 - // We can GZip manually for now. - zlib.gunzip(new Buffer(res.data, 'base64'), (unzipErr, unzipped) => { - if (unzipErr) - return reject(unzipErr); - - res.data = unzipped.toString(); - - resolve(res); - }); - } - else - resolve(res); + resolve(res); }) ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/backend/app/browsersHandler.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js index 4fb5088..f4ff23c 100644 --- a/modules/web-console/backend/app/browsersHandler.js +++ b/modules/web-console/backend/app/browsersHandler.js @@ -181,8 +181,12 @@ module.exports.factory = (_, socketio, configure, errors, mongo) => { return agent .then((agentSock) => agentSock.emitEvent('node:rest', {uri: 'ignite', demo, params})) .then((res) => { - if (res.status === 0) + if (res.status === 0) { + if (res.zipped) + return res; + return JSON.parse(res.data); + } throw new Error(res.error); }); @@ -250,6 +254,9 @@ module.exports.factory = (_, socketio, configure, errors, mongo) => { this.executeOnNode(agent, demo, params) .then((data) => { + if (data.zipped) + return cb(null, data); + if (data.finished) return cb(null, data.result); http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/modules/agent/AgentManager.service.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js index 288ec94..752b4f0 100644 --- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js +++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js @@ -17,6 +17,8 @@ import { BehaviorSubject } from 'rxjs/BehaviorSubject'; +import Worker from 'worker!./decompress.worker'; +import SimpleWorkerPool from '../../utils/SimpleWorkerPool'; import maskNull from 'app/core/utils/maskNull'; const State = { @@ -82,11 +84,9 @@ export default class IgniteAgentManager { this.promises = new Set(); - /** - * Connection to backend. - * @type {Socket} - */ - this.socket = null; + this.pool = new SimpleWorkerPool('decompressor', Worker, 4); + + this.socket = null; // Connection to backend. let cluster; @@ -364,7 +364,13 @@ export default class IgniteAgentManager { * @private */ _rest(event, ...args) { - return this._emit(event, _.get(this.connectionSbj.getValue(), 'cluster.id'), ...args); + return this._emit(event, _.get(this.connectionSbj.getValue(), 'cluster.id'), ...args) + .then((data) => { + if (data.zipped) + return this.pool.postMessage(data.data); + + return data; + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/modules/agent/decompress.worker.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/modules/agent/decompress.worker.js b/modules/web-console/frontend/app/modules/agent/decompress.worker.js new file mode 100644 index 0000000..d8e176d --- /dev/null +++ b/modules/web-console/frontend/app/modules/agent/decompress.worker.js @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import _ from 'lodash'; +import pako from 'pako'; + +/** This worker decode & decompress BASE64/Zipped data and parse to JSON. */ +// eslint-disable-next-line no-undef +onmessage = function(e) { + const data = e.data; + + const binaryString = atob(data); // Decode from BASE64 + + const unzipped = pako.inflate(binaryString, {to: 'string'}); + + const res = JSON.parse(unzipped); + + postMessage(_.get(res, 'result', res)); +}; http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/utils/SimpleWorkerPool.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/utils/SimpleWorkerPool.js b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js new file mode 100644 index 0000000..d8ed28b --- /dev/null +++ b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {Observable} from 'rxjs/Observable'; +import {Subject} from 'rxjs/Subject'; +import 'rxjs/add/observable/race'; +import 'rxjs/add/operator/filter'; +import 'rxjs/add/operator/pluck'; +import 'rxjs/add/operator/take'; +import 'rxjs/add/operator/toPromise'; + +/** + * Simple implementation of workers pool. + */ +export default class SimpleWorkerPool { + constructor(name, WorkerClass, poolSize = (navigator.hardwareConcurrency || 4), dbg = false) { + this._name = name; + this._WorkerClass = WorkerClass; + this._tasks = []; + this._msgId = 0; + this.messages$ = new Subject(); + this.errors$ = new Subject(); + this.__dbg = dbg; + + this._workers = _.range(poolSize).map(() => { + const worker = new this._WorkerClass(); + + worker.onmessage = (m) => { + this.messages$.next({tid: worker.tid, m}); + + worker.tid = null; + + this._run(); + }; + + worker.onerror = (e) => { + this.errors$.next({tid: worker.tid, e}); + + worker.tid = null; + + this._run(); + }; + + return worker; + }); + } + + _makeTaskID() { + return this._msgId++; + } + + _getNextWorker() { + return this._workers.find((w) => !w.tid); + } + + _getNextTask() { + return this._tasks.shift(); + } + + _run() { + const worker = this._getNextWorker(); + + if (!worker || !this._tasks.length) + return; + + const task = this._getNextTask(); + + worker.tid = task.tid; + + if (this.__dbg) + console.time(`Post message[pool=${this._name}]`); + + worker.postMessage(task.data); + + if (this.__dbg) + console.timeEnd('Post message'); + } + + terminate() { + this._workers.forEach((w) => w.terminate()); + + this.messages$.complete(); + this.errors$.complete(); + + this._workers = null; + } + + postMessage(data) { + const tid = this._makeTaskID(); + + this._tasks.push({tid, data}); + + if (this.__dbg) + console.log(`Pool: [name=${this._name}, queue=${this._tasks.length}]`); + + this._run(); + + return Observable.race( + this.messages$.filter((e) => e.tid === tid).take(1).pluck('m', 'data'), + this.errors$.filter((e) => e.tid === tid).take(1).map((e) => { + throw e.e; + })) + .take(1).toPromise(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/package.json ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/package.json b/modules/web-console/frontend/package.json index d828e17..2083640 100644 --- a/modules/web-console/frontend/package.json +++ b/modules/web-console/frontend/package.json @@ -80,6 +80,7 @@ "lodash": "4.17.4", "node-sass": "4.5.3", "nvd3": "1.8.4", + "pako": "1.0.6", "progress-bar-webpack-plugin": "1.10.0", "pug-html-loader": "1.1.0", "pug-loader": "2.3.0",
