This is an automated email from the ASF dual-hosted git repository. alexkli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk-wskdebug.git
commit 0097ab1b6a297179921c9b4d0d543aba5ab52c3c Author: Alexander Klimetschek <[email protected]> AuthorDate: Sat Mar 28 23:49:00 2020 -0700 refactor huge debugger.js into separate files & classes --- .eslintignore | 1 + src/agentmgr.js | 505 +++++++++++++++++++++++++++++++++++++++++ src/debugger.js | 692 +++----------------------------------------------------- src/ngrok.js | 139 ++++++++++++ src/watcher.js | 123 ++++++++++ 5 files changed, 800 insertions(+), 660 deletions(-) diff --git a/.eslintignore b/.eslintignore new file mode 100644 index 0000000..c795b05 --- /dev/null +++ b/.eslintignore @@ -0,0 +1 @@ +build \ No newline at end of file diff --git a/src/agentmgr.js b/src/agentmgr.js new file mode 100644 index 0000000..cb0e5a9 --- /dev/null +++ b/src/agentmgr.js @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const NgrokAgent = require('./ngrok'); +const fs = require('fs-extra'); +const sleep = require('util').promisify(setTimeout); + +function getAnnotation(action, key) { + const a = action.annotations.find(a => a.key === key); + if (a) { + return a.value; + } +} + +function getActionCopyName(name) { + return `${name}_wskdebug_original`; +} + +function isAgent(action) { + return getAnnotation(action, "wskdebug") || + (getAnnotation(action, "description") || "").startsWith("wskdebug agent."); +} + +function getActivationError(e) { + if (e.error && e.error.response && e.error.response.result && e.error.response.result.error) { + return e.error.response.result.error; + } + return {}; +} + +async function getWskActionWithoutCode(wsk, actionName) { + try { + return await wsk.actions.get({name: actionName, code:false}); + } catch (e) { + if (e.statusCode === 404) { + return null; + } else { + throw e; + } + } +} + +async function actionExists(wsk, name) { + try { + await wsk.actions.get({name: name, code: false}); + return true; + } catch (e) { + return false; + } +} + +async function deleteActionIfExists(wsk, name) { + if (await actionExists(wsk, name)) { + await wsk.actions.delete(name); + } +} + + +// TODO: test wskdebug manually +// TODO: openwhiskSupports() into separate shared class +class AgentMgr { + + constructor(argv, wsk, actionName) { + this.argv = argv; + this.wsk = wsk; + this.actionName = actionName; + this.polling = true; + } + + async readAction() { + if (this.argv.verbose) { + console.log(`Getting action metadata from OpenWhisk: ${this.actionName}`); + } + let action = await getWskActionWithoutCode(this.wsk, this.actionName); + if (action === null) { + throw new Error(`Action not found: ${this.actionName}`); + } + + let agentAlreadyInstalled = false; + + // check if this actoin needs to + if (isAgent(action)) { + // ups, action is our agent, not the original + // happens if a previous wskdebug was killed and could not restore before it exited + const backupName = getActionCopyName(this.actionName); + + // check the backup action + try { + const backup = await this.wsk.actions.get(backupName); + + if (isAgent(backup)) { + // backup is also an agent (should not happen) + // backup is useless, delete it + // await this.wsk.actions.delete(backupName); + throw new Error(`Dang! Agent is already installed and action backup is broken (${backupName}).\n\nPlease redeploy your action first before running wskdebug again.`); + + } else { + console.warn("Agent was already installed, but backup is still present. All good."); + + // need to look at the original action + action = backup; + agentAlreadyInstalled = true; + this.agentInstalled = true; + } + + } catch (e) { + if (e.statusCode === 404) { + // backup missing + throw new Error(`Dang! Agent is already installed and action backup is gone (${backupName}).\n\nPlease redeploy your action first before running wskdebug again.`); + + } else { + // other error + throw e; + } + } + } + return {action, agentAlreadyInstalled }; + } + + async installAgent(action) { + this.agentInstalled = true; + + let agentName; + + // choose the right agent implementation + let agentCode; + if (this.argv.ngrok) { + // user manually requested ngrok + + this.ngrokAgent = new NgrokAgent(this.argv); + + // agent using ngrok for forwarding + agentName = "ngrok"; + agentCode = await this.ngrokAgent.getAgent(action); + + } else { + this.concurrency = await this.openwhiskSupports("concurrency"); + if (this.concurrency) { + // normal fast agent using concurrent node.js actions + agentName = "concurrency"; + agentCode = await this.getConcurrencyAgent(); + + } else { + console.log("This OpenWhisk does not support action concurrency. Debugging will be a bit slower. Consider using '--ngrok' which might be a faster option."); + + agentName = "polling activation db"; + agentCode = await this.getPollingActivationRecordAgent(); + } + } + + const backupName = getActionCopyName(this.actionName); + + if (this.argv.verbose) { + console.log(`Installing agent in OpenWhisk (${agentName})...`); + } + + // create copy + await this.wsk.actions.update({ + name: backupName, + action: action + }); + + if (this.argv.verbose) { + console.log(`Original action backed up at ${backupName}.`); + } + + if (this.argv.condition) { + action.parameters.push({ + key: "$condition", + value: this.argv.condition + }); + } + + await this.pushAgent(action, agentCode, backupName); + + if (this.argv.verbose) { + console.log(`Agent installed.`); + } + } + + stop() { + this.polling = false; + } + + async shutdown() { + try { + await this.restoreAction(); + } finally { + if (this.ngrokAgent) { + await this.ngrokAgent.stop(); + } + } + } + + // --------------------------------------< polling >------------------- + + async waitForActivations() { + this.activationsSeen = this.activationsSeen || {}; + + // secondary loop to get next activation + // the $waitForActivation agent activation will block, but only until + // it times out, hence we need to retry when it fails + while (this.polling) { + if (this.argv.verbose) { + process.stdout.write("."); + } + try { + let activation; + if (this.concurrency) { + // invoke - blocking for up to 1 minute + activation = await this.wsk.actions.invoke({ + name: this.actionName, + params: { + $waitForActivation: true + }, + blocking: true + }); + + } else { + // poll for the newest activation + const since = Date.now(); + + // older openwhisk only allows the name of an action when filtering activations + // newer openwhisk versions want package/name + let name = this.actionName; + if (await this.openwhiskSupports("activationListFilterOnlyBasename")) { + if (this.actionName.includes("/")) { + name = this.actionName.substring(this.actionName.lastIndexOf("/") + 1); + } + } + + while (true) { + if (this.argv.verbose) { + process.stdout.write("."); + } + + const activations = await this.wsk.activations.list({ + name: `${name}_wskdebug_invoked`, + since: since, + limit: 1, // get the most recent one only + docs: true // include results + }); + + if (activations && activations.length >= 1) { + const a = activations[0]; + if (a.response && a.response.result && !this.activationsSeen[a.activationId]) { + activation = a; + break; + } + } + + // need to limit load on openwhisk (activation list) + await sleep(1000); + } + } + + // check for successful response with a new activation + if (activation && activation.response) { + const params = activation.response.result; + + // mark this as seen so we don't reinvoke it + this.activationsSeen[activation.activationId] = true; + + if (this.argv.verbose) { + console.log(); + console.info(`Activation: ${params.$activationId}`); + console.log(params); + } else { + console.info(`Activation: ${params.$activationId}`); + } + return params; + + } else if (activation && activation.activationId) { + // ignore this and retry. + // usually means the action did not respond within one second, + // which in turn is unlikely for the agent who should exit itself + // after 50 seconds, so can only happen if there was some delay + // outside the action itself + + } else { + // unexpected, just log and retry + console.log("Unexpected empty response while waiting for new activations:", activation); + } + + } catch (e) { + // look for special error codes from agent + const errorCode = getActivationError(e).code; + // 42 => retry + if (errorCode === 42) { + // do nothing + } else if (errorCode === 43) { + // 43 => graceful shutdown (for unit tests) + console.log("Graceful shutdown requested by agent (only for unit tests)"); + return null; + } else { + // otherwise log error and abort + console.error(); + console.error("Unexpected error while polling agent for activation:"); + console.dir(e, { depth: null }); + throw new Error("Unexpected error while polling agent for activation."); + } + } + + // some small wait to avoid too many requests in case things run amok + await sleep(100); + } + } + + async completeActivation(activationId, result, duration) { + console.info(`Completed activation ${activationId} in ${duration/1000.0} sec`); + if (this.argv.verbose) { + console.log(result); + } + + try { + result.$activationId = activationId; + await this.wsk.actions.invoke({ + name: this.concurrency ? this.actionName : `${this.actionName}_wskdebug_completed`, + params: result, + blocking: true + }); + } catch (e) { + // look for special error codes from agent + const errorCode = getActivationError(e).code; + // 42 => retry + if (errorCode === 42) { + // do nothing + } else if (errorCode === 43) { + // 43 => graceful shutdown (for unit tests) + console.log("Graceful shutdown requested by agent (only for unit tests)"); + return false; + } else { + console.error("Unexpected error while completing activation:", e); + } + } + return true; + } + + // --------------------------------------< restoring >------------------ + + async restoreAction() { + if (this.agentInstalled) { + if (this.argv.verbose) { + console.log(); + console.log(`Restoring action`); + } + + const copy = getActionCopyName(this.actionName); + + try { + const original = await this.wsk.actions.get(copy); + + // copy the backup (copy) to the regular action + await this.wsk.actions.update({ + name: this.actionName, + action: original + }); + + // remove the backup + await this.wsk.actions.delete(copy); + + // remove any helpers if they exist + await deleteActionIfExists(this.wsk, `${this.actionName}_wskdebug_invoked`); + await deleteActionIfExists(this.wsk, `${this.actionName}_wskdebug_completed`); + + } catch (e) { + console.error("Error while restoring original action:", e); + } + } + } + + // --------------------------------------< agent types >------------------ + + async getConcurrencyAgent() { + return fs.readFileSync(`${__dirname}/../agent/agent-concurrency.js`, {encoding: 'utf8'}); + } + + async getPollingActivationRecordAgent() { + // this needs 2 helper actions in addition to the agent in place of the action + await this.createHelperAction(`${this.actionName}_wskdebug_invoked`, `${__dirname}/../agent/echo.js`); + await this.createHelperAction(`${this.actionName}_wskdebug_completed`, `${__dirname}/../agent/echo.js`); + + let agentCode = fs.readFileSync(`${__dirname}/../agent/agent-activationdb.js`, {encoding: 'utf8'}); + // rewrite the code to pass config (we want to avoid fiddling with default params of the action) + if (await this.openwhiskSupports("activationListFilterOnlyBasename")) { + agentCode = agentCode.replace("const activationListFilterOnlyBasename = false;", "const activationListFilterOnlyBasename = true;"); + } + return agentCode; + } + + async pushAgent(action, agentCode, backupName) { + // overwrite action with agent + + // this is to support older openwhisks for which nodejs:default is less than version 8 + const nodejs8 = await this.openwhiskSupports("nodejs8"); + + await this.wsk.actions.update({ + name: this.actionName, + action: { + exec: { + kind: nodejs8 ? "nodejs:default" : "blackbox", + image: nodejs8 ? undefined : "openwhisk/action-nodejs-v8", + code: agentCode + }, + limits: { + timeout: (this.argv.agentTimeout || 300) * 1000, + concurrency: this.concurrency ? 200: 1 + }, + annotations: [ + ...action.annotations, + { key: "provide-api-key", value: true }, + { key: "wskdebug", value: true }, + { key: "description", value: `wskdebug agent. temporarily installed over original action. original action backup at ${backupName}.` } + ], + parameters: action.parameters + } + }); + } + + async createHelperAction(actionName, file) { + const nodejs8 = await this.openwhiskSupports("nodejs8"); + + await this.wsk.actions.update({ + name: actionName, + action: { + exec: { + kind: nodejs8 ? "nodejs:default" : "blackbox", + image: nodejs8 ? undefined : "openwhisk/action-nodejs-v8", + code: fs.readFileSync(file, {encoding: 'utf8'}) + }, + limits: { + timeout: (this.argv.agentTimeout || 300) * 1000 + }, + annotations: [ + { key: "description", value: `wskdebug agent helper. temporarily installed.` } + ] + } + }); + } + + // ----------------------------------------< openwhisk feature detection >----------------- + + async getOpenWhiskVersion() { + if (this.openwhiskVersion === undefined) { + try { + const json = await this.wsk.actions.client.request("GET", "/api/v1"); + if (json && typeof json.build === "string") { + this.openwhiskVersion = json.build; + } else { + this.openwhiskVersion = null; + } + } catch (e) { + console.warn("Could not retrieve OpenWhisk version:", e.message); + this.openwhiskVersion = null; + } + } + return this.openwhiskVersion; + } + + async openwhiskSupports(feature) { + const FEATURES = { + // guesstimated + activationListFilterOnlyBasename: v => v.startsWith("2018") || v.startsWith("2017"), + // hack + nodejs8: v => !v.startsWith("2018") && !v.startsWith("2017"), + concurrency: async (_, wsk) => { + // check swagger api docs instead of version to see if concurrency is supported + try { + const swagger = await wsk.actions.client.request("GET", "/api/v1/api-docs"); + + if (swagger && swagger.definitions && swagger.definitions.ActionLimits && swagger.definitions.ActionLimits.properties) { + return swagger.definitions.ActionLimits.properties.concurrency; + } + } catch (e) { + console.warn('Could not read /api/v1/api-docs, setting max action concurrency to 1') + return false; + } + } + }; + const checker = FEATURES[feature]; + if (checker) { + return checker(await this.getOpenWhiskVersion(), this.wsk); + } else { + throw new Error("Unknown feature " + feature); + } + } +} + +module.exports = AgentMgr; \ No newline at end of file diff --git a/src/debugger.js b/src/debugger.js index 86bc8ec..e43c8e8 100644 --- a/src/debugger.js +++ b/src/debugger.js @@ -17,33 +17,21 @@ 'use strict'; -const openwhisk = require("openwhisk"); const wskprops = require('./wskprops'); -const fs = require('fs-extra'); const OpenWhiskInvoker = require('./invoker'); +const AgentMgr = require('./agentmgr'); +const Watcher = require('./watcher'); +const openwhisk = require('openwhisk'); const { spawnSync } = require('child_process'); -const livereload = require('livereload'); -const http = require('http'); -const ngrok = require('ngrok'); -const url = require('url'); -const util = require('util'); -const crypto = require("crypto"); - -async function sleep(millis) { - return new Promise(resolve => setTimeout(resolve, millis)); -} - -function getAnnotation(action, key) { - const a = action.annotations.find(a => a.key === key); - if (a) { - return a.value; - } -} +const sleep = require('util').promisify(setTimeout); +/** + * Central component of wskdebug. + */ class Debugger { constructor(argv) { this.argv = argv; - this.action = argv.action; + this.actionName = argv.action; this.wskProps = wskprops.get(); if (argv.ignoreCerts) { @@ -54,16 +42,19 @@ class Debugger { async start() { await this.setupWsk(); + this.agentMgr = new AgentMgr(this.argv, this.wsk, this.actionName); + this.watcher = new Watcher(this.argv, this.wsk); + // quick fail for missing requirements such as docker not running await OpenWhiskInvoker.checkIfAvailable(); - console.info(`Starting debugger for /${this.wskProps.namespace}/${this.action}`); + console.info(`Starting debugger for /${this.wskProps.namespace}/${this.actionName}`); // get the action - const { action, agentAlreadyInstalled } = await this.getAction(this.action); + const { action, agentAlreadyInstalled } = await this.agentMgr.readAction(); // local debug container - this.invoker = new OpenWhiskInvoker(this.action, action, this.argv, this.wskProps, this.wsk); + this.invoker = new OpenWhiskInvoker(this.actionName, action, this.argv, this.wskProps, this.wsk); try { // run build initially (would be required by starting container) @@ -77,9 +68,9 @@ class Debugger { // get code and /init local container if (this.argv.verbose) { - console.log(`Fetching action code from OpenWhisk: ${this.action}`); + console.log(`Fetching action code from OpenWhisk: ${this.actionName}`); } - const actionWithCode = await this.wsk.actions.get(this.action); + const actionWithCode = await this.wsk.actions.get(this.actionName); action.exec = actionWithCode.exec; await this.invoker.init(actionWithCode); @@ -88,21 +79,21 @@ class Debugger { // user can switch between agents (ngrok or not), hence we need to restore // (better would be to track the agent + its version and avoid a restore, but that's TBD) if (agentAlreadyInstalled) { - await this.restoreAction(this.action); + await this.agentMgr.restoreAction(); } - await this.installAgent(this.action, action); + await this.agentMgr.installAgent(action); if (this.argv.onStart) { console.log("On start:", this.argv.onStart); spawnSync(this.argv.onStart, {shell: true, stdio: "inherit"}); } - // start live reload (if requested) - await this.startSourceWatching(); + // start source watching (live reload) if requested + await this.watcher.start(); console.log(); - console.info(`Action : ${this.action}`); + console.info(`Action : ${this.actionName}`); this.invoker.logInfo(); if (this.argv.condition) { console.info(`Condition : ${this.argv.condition}`); @@ -139,9 +130,9 @@ class Debugger { // agent: concurrent // agent: non-concurrent // wait for activation, run it, complete, repeat - const activation = await this.waitForActivations(this.action); + const activation = await this.agentMgr.waitForActivations(); if (!activation) { - this.running = false; + // this.running = false; return; } @@ -157,7 +148,10 @@ class Debugger { const duration = Date.now() - startTime; // pass on the local result to the agent in openwhisk - await this.completeActivation(this.action, id, result, duration); + if (!await this.agentMgr.completeActivation(id, result, duration)) { + // this.running = false; + return; + } } } } finally { @@ -167,6 +161,8 @@ class Debugger { async stop() { this.running = false; + this.agentMgr.stop(); + if (this.runPromise) { // wait for the main loop to gracefully end, which will call shutdown() await this.runPromise; @@ -178,6 +174,7 @@ class Debugger { async kill() { this.running = false; + this.agentMgr.stop(); await this.shutdown(); } @@ -192,29 +189,9 @@ class Debugger { // need to shutdown everything even if some fail, hence tryCatch() for each - if (this.action) { - await this.tryCatch(this.restoreAction(this.action)); - } + await this.tryCatch(this.agentMgr.shutdown()); await this.tryCatch(this.invoker.stop()); - - if (this.liveReloadServer) { - await this.tryCatch(() => { - if (this.liveReloadServer.server) { - this.liveReloadServer.close(); - } else { - this.liveReloadServer.watcher.close(); - } - this.liveReloadServer = null; - }); - } - - if (this.ngrokServer) { - await this.tryCatch(() => { - this.ngrokServer.close(); - this.ngrokServer = null; - }); - } - await this.tryCatch(ngrok.kill()); + await this.tryCatch(this.watcher.stop()); // only log this if we started properly if (this.ready) { @@ -246,611 +223,6 @@ class Debugger { } } - async getWskActionWithoutCode(actionName) { - if (this.argv.verbose) { - console.log(`Getting action metadata from OpenWhisk: ${actionName}`); - } - try { - return await this.wsk.actions.get({name: actionName, code:false}); - } catch (e) { - if (e.statusCode === 404) { - return null; - } else { - throw e; - } - } - } - - async actionExists(name) { - try { - await this.wsk.actions.get({name: name, code: false}); - return true; - } catch (e) { - return false; - } - } - - async deleteActionIfExists(name) { - if (await this.actionExists(name)) { - await this.wsk.actions.delete(name); - } - } - - // ------------------------------------------------< agent >------------------ - - getActionCopyName(name) { - return `${name}_wskdebug_original`; - } - - isAgent(action) { - return getAnnotation(action, "wskdebug") || - (getAnnotation(action, "description") || "").startsWith("wskdebug agent."); - } - - async getAction(actionName) { - let action = await this.getWskActionWithoutCode(actionName); - if (action === null) { - throw new Error(`Action not found: ${actionName}`); - } - - let agentAlreadyInstalled = false; - - // check if this actoin needs to - if (this.isAgent(action)) { - // ups, action is our agent, not the original - // happens if a previous wskdebug was killed and could not restore before it exited - const backupName = this.getActionCopyName(actionName); - - // check the backup action - try { - const backup = await this.wsk.actions.get(backupName); - - if (this.isAgent(backup)) { - // backup is also an agent (should not happen) - // backup is useless, delete it - // await this.wsk.actions.delete(backupName); - throw new Error(`Dang! Agent is already installed and action backup is broken (${backupName}).\n\nPlease redeploy your action first before running wskdebug again.`); - - } else { - console.warn("Agent was already installed, but backup is still present. All good."); - - // need to look at the original action - action = backup; - agentAlreadyInstalled = true; - this.agentInstalled = true; - } - - } catch (e) { - if (e.statusCode === 404) { - // backup missing - throw new Error(`Dang! Agent is already installed and action backup is gone (${backupName}).\n\nPlease redeploy your action first before running wskdebug again.`); - - } else { - // other error - throw e; - } - } - } - return {action, agentAlreadyInstalled }; - } - - async createHelperAction(actionName, file) { - const nodejs8 = await this.openwhiskSupports("nodejs8"); - - await this.wsk.actions.update({ - name: actionName, - action: { - exec: { - kind: nodejs8 ? "nodejs:default" : "blackbox", - image: nodejs8 ? undefined : "openwhisk/action-nodejs-v8", - code: fs.readFileSync(file, {encoding: 'utf8'}) - }, - limits: { - timeout: (this.argv.agentTimeout || 300) * 1000 - }, - annotations: [ - { key: "description", value: `wskdebug agent helper. temporarily installed.` } - ] - } - }); - } - - async installAgent(actionName, action) { - this.agentInstalled = true; - - const agentDir = `${__dirname}/../agent`; - let agentName; - - // choose the right agent implementation - let code; - if (this.argv.ngrok) { - // user manually requested ngrok - if (this.argv.verbose) { - console.log("Setting up ngrok", this.argv.ngrokRegion ? `(region: ${this.argv.ngrokRegion})` : ""); - } - - // 1. start local server on random port - this.ngrokServer = http.createServer(this.ngrokHandler.bind(this)); - // turn server.listen() into promise so we can await - const listen = util.promisify( this.ngrokServer.listen.bind(this.ngrokServer) ); - await listen(0, '127.0.0.1'); - - // 2. start ngrok tunnel connected to that port - this.ngrokServerPort = this.ngrokServer.address().port; - - // create a unique authorization token that we check on our local instance later - // this adds extra protection on top of the uniquely generated ngrok subdomain (e.g. a01ae275.ngrok.io) - this.ngrokAuth = crypto.randomBytes(32).toString("hex"); - const ngrokUrl = await ngrok.connect({ - addr: this.ngrokServerPort, - region: this.argv.ngrokRegion - }); - - // 3. pass on public ngrok url to agent - action.parameters.push({ - key: "$ngrokUrl", - value: url.parse(ngrokUrl).host - }); - action.parameters.push({ - key: "$ngrokAuth", - value: this.ngrokAuth - }); - - console.log(`Ngrok forwarding: ${ngrokUrl} => http://localhost:${this.ngrokServerPort} (auth: ${this.ngrokAuth})`); - - // agent using ngrok for forwarding - agentName = "ngrok"; - code = fs.readFileSync(`${agentDir}/agent-ngrok.js`, {encoding: 'utf8'}); - - } else { - this.concurrency = await this.openwhiskSupports("concurrency"); - if (this.concurrency) { - // normal fast agent using concurrent node.js actions - agentName = "concurrency"; - code = fs.readFileSync(`${agentDir}/agent-concurrency.js`, {encoding: 'utf8'}); - - } else { - console.log("This OpenWhisk does not support action concurrency. Debugging will be a bit slower. Consider using '--ngrok' which might be a faster option."); - - agentName = "polling activation db"; - - // this needs 2 helper actions in addition to the agent in place of the action - await this.createHelperAction(`${actionName}_wskdebug_invoked`, `${agentDir}/echo.js`); - await this.createHelperAction(`${actionName}_wskdebug_completed`, `${agentDir}/echo.js`); - - code = fs.readFileSync(`${agentDir}/agent-activationdb.js`, {encoding: 'utf8'}); - // rewrite the code to pass config (we want to avoid fiddling with default params of the action) - if (await this.openwhiskSupports("activationListFilterOnlyBasename")) { - code = code.replace("const activationListFilterOnlyBasename = false;", "const activationListFilterOnlyBasename = true;"); - } - } - } - - const backupName = this.getActionCopyName(actionName); - - if (this.argv.verbose) { - console.log(`Installing agent in OpenWhisk (${agentName})...`); - } - - // create copy - await this.wsk.actions.update({ - name: backupName, - action: action - }); - - if (this.argv.verbose) { - console.log(`Original action backed up at ${backupName}.`); - } - - // this is to support older openwhisks for which nodejs:default is less than version 8 - const nodejs8 = await this.openwhiskSupports("nodejs8"); - - if (this.argv.condition) { - action.parameters.push({ - key: "$condition", - value: this.argv.condition - }); - } - - // overwrite action with agent - await this.wsk.actions.update({ - name: actionName, - action: { - exec: { - kind: nodejs8 ? "nodejs:default" : "blackbox", - image: nodejs8 ? undefined : "openwhisk/action-nodejs-v8", - code: code - }, - limits: { - timeout: (this.argv.agentTimeout || 300) * 1000, - concurrency: this.concurrency ? 200: 1 - }, - annotations: [ - ...action.annotations, - { key: "provide-api-key", value: true }, - { key: "wskdebug", value: true }, - { key: "description", value: `wskdebug agent. temporarily installed over original action. original action backup at ${backupName}.` } - ], - parameters: action.parameters - } - }); - - if (this.argv.verbose) { - console.log(`Agent installed.`); - } - } - - async restoreAction(actionName) { - if (this.agentInstalled) { - if (this.argv.verbose) { - console.log(); - console.log(`Restoring action`); - } - - const copy = this.getActionCopyName(actionName); - - try { - const original = await this.wsk.actions.get(copy); - - // copy the backup (copy) to the regular action - await this.wsk.actions.update({ - name: actionName, - action: original - }); - - // remove the backup - await this.wsk.actions.delete(copy); - - // remove any helpers if they exist - await this.deleteActionIfExists(`${actionName}_wskdebug_invoked`); - await this.deleteActionIfExists(`${actionName}_wskdebug_completed`); - - } catch (e) { - console.error("Error while restoring original action:", e); - } - } - } - - // ------------------------------------------------< ngrok >------------------ - - // local http server retrieving forwards from the ngrok agent, running them - // as a blocking local invocation and then returning the activation result back - ngrokHandler(req, res) { - // check authorization against our unique token - const authHeader = req.headers.authorization; - if (authHeader !== this.ngrokAuth) { - res.statusCode = 401; - res.end(); - return; - } - - if (req.method === 'POST') { - // agent POSTs arguments as json body - let body = ''; - // collect full request body first - req.on('data', chunk => { - body += chunk.toString(); - }); - req.on('end', async () => { - try { - const params = JSON.parse(body); - const id = params.$activationId; - delete params.$activationId; - - if (this.argv.verbose) { - console.log(); - console.info(`Activation: ${id}`); - console.log(params); - } else { - console.info(`Activation: ${id}`); - } - - const startTime = Date.now(); - - const result = await this.invoker.run(params, id); - - const duration = Date.now() - startTime; - console.info(`Completed activation ${id} in ${duration/1000.0} sec`); - if (this.argv.verbose) { - console.log(result); - } - - res.statusCode = 200; - res.setHeader("Content-Type", "application/json"); - res.end(JSON.stringify(result)); - - } catch (e) { - console.error(e); - res.statusCode = 400; - res.end(); - } - }); - } else { - res.statusCode = 404; - res.end(); - } - } - - // ------------------------------------------------< polling >------------------ - - async waitForActivations(actionName) { - this.activationsSeen = this.activationsSeen || {}; - - // secondary loop to get next activation - // the $waitForActivation agent activation will block, but only until - // it times out, hence we need to retry when it fails - while (this.running) { - if (this.argv.verbose) { - process.stdout.write("."); - } - try { - let activation; - if (this.concurrency) { - // invoke - blocking for up to 1 minute - activation = await this.wsk.actions.invoke({ - name: actionName, - params: { - $waitForActivation: true - }, - blocking: true - }); - - } else { - // poll for the newest activation - const since = Date.now(); - - // older openwhisk only allows the name of an action when filtering activations - // newer openwhisk versions want package/name - let name = actionName; - if (await this.openwhiskSupports("activationListFilterOnlyBasename")) { - if (actionName.includes("/")) { - name = actionName.substring(actionName.lastIndexOf("/") + 1); - } - } - - while (true) { - if (this.argv.verbose) { - process.stdout.write("."); - } - - const activations = await this.wsk.activations.list({ - name: `${name}_wskdebug_invoked`, - since: since, - limit: 1, // get the most recent one only - docs: true // include results - }); - - if (activations && activations.length >= 1) { - const a = activations[0]; - if (a.response && a.response.result && !this.activationsSeen[a.activationId]) { - activation = a; - break; - } - } - - // need to limit load on openwhisk (activation list) - await sleep(1000); - } - } - - // check for successful response with a new activation - if (activation && activation.response) { - const params = activation.response.result; - - // mark this as seen so we don't reinvoke it - this.activationsSeen[activation.activationId] = true; - - if (this.argv.verbose) { - console.log(); - console.info(`Activation: ${params.$activationId}`); - console.log(params); - } else { - console.info(`Activation: ${params.$activationId}`); - } - return params; - - } else if (activation && activation.activationId) { - // ignore this and retry. - // usually means the action did not respond within one second, - // which in turn is unlikely for the agent who should exit itself - // after 50 seconds, so can only happen if there was some delay - // outside the action itself - - } else { - // unexpected, just log and retry - console.log("Unexpected empty response while waiting for new activations:", activation); - } - - } catch (e) { - // look for special error codes from agent - const errorCode = this.getActivationError(e).code; - // 42 => retry - if (errorCode === 42) { - // do nothing - } else if (errorCode === 43) { - // 43 => graceful shutdown (for unit tests) - console.log("Graceful shutdown requested by agent (only for unit tests)"); - return null; - } else { - // otherwise log error and abort - console.error(); - console.error("Unexpected error while polling agent for activation:"); - console.dir(e, { depth: null }); - throw new Error("Unexpected error while polling agent for activation."); - } - } - - // some small wait to avoid too many requests in case things run amok - await sleep(100); - } - } - - getActivationError(e) { - if (e.error && e.error.response && e.error.response.result && e.error.response.result.error) { - return e.error.response.result.error; - } - return {}; - } - - async completeActivation(actionName, activationId, result, duration) { - console.info(`Completed activation ${activationId} in ${duration/1000.0} sec`); - if (this.argv.verbose) { - console.log(result); - } - - try { - result.$activationId = activationId; - await this.wsk.actions.invoke({ - name: this.concurrency ? actionName : `${actionName}_wskdebug_completed`, - params: result, - blocking: true - }); - } catch (e) { - // look for special error codes from agent - const errorCode = this.getActivationError(e).code; - // 42 => retry - if (errorCode === 42) { - // do nothing - } else if (errorCode === 43) { - // 43 => graceful shutdown (for unit tests) - console.log("Graceful shutdown requested by agent (only for unit tests)"); - this.running = false; - } else { - console.error("Unexpected error while completing activation:", e); - } - } - } - - // ----------------------------------------< openwhisk feature detection >----------------- - - async getOpenWhiskVersion() { - if (this.openwhiskVersion === undefined) { - try { - const json = await this.wsk.actions.client.request("GET", "/api/v1"); - if (json && typeof json.build === "string") { - this.openwhiskVersion = json.build; - } else { - this.openwhiskVersion = null; - } - } catch (e) { - console.warn("Could not retrieve OpenWhisk version:", e.message); - this.openwhiskVersion = null; - } - } - return this.openwhiskVersion; - } - - async openwhiskSupports(feature) { - const FEATURES = { - // guesstimated - activationListFilterOnlyBasename: v => v.startsWith("2018") || v.startsWith("2017"), - // hack - nodejs8: v => !v.startsWith("2018") && !v.startsWith("2017"), - concurrency: async (_, wsk) => { - // check swagger api docs instead of version to see if concurrency is supported - try { - const swagger = await wsk.actions.client.request("GET", "/api/v1/api-docs"); - - if (swagger && swagger.definitions && swagger.definitions.ActionLimits && swagger.definitions.ActionLimits.properties) { - return swagger.definitions.ActionLimits.properties.concurrency; - } - } catch (e) { - console.warn('Could not read /api/v1/api-docs, setting max action concurrency to 1') - return false; - } - } - }; - const checker = FEATURES[feature]; - if (checker) { - return checker(await this.getOpenWhiskVersion(), this.wsk); - } else { - throw new Error("Unknown feature " + feature); - } - } - - // ------------------------------------------------< source watching >----------------- - - async startSourceWatching() { - const watch = this.argv.watch || process.cwd(); - if (watch && - // each of these triggers listening - ( this.argv.livereload - || this.argv.onBuild - || this.argv.onChange - || this.argv.invokeParams - || this.argv.invokeAction ) - ) { - this.liveReloadServer = livereload.createServer({ - port: this.argv.livereloadPort, - noListen: !this.argv.livereload, - exclusions: [this.argv.buildPath, "node_modules/**"], - exts: this.argv.watchExts || ["json", "js", "ts", "coffee", "py", "rb", "erb", "go", "java", "scala", "php", "swift", "rs", "cs", "bal", "php", "php5"], - extraExts: [] - }); - this.liveReloadServer.watch(watch); - - // overwrite function to get notified on changes - const refresh = this.liveReloadServer.refresh; - const argv = this.argv; - const wsk = this.wsk; - this.liveReloadServer.refresh = function(filepath) { - try { - let result = []; - - if (argv.verbose) { - console.log("File modified:", filepath); - } - - // call original function if we are listening - if (argv.livereload) { - result = refresh.call(this, filepath); - } - - // run build command before invoke triggers below - if (argv.onBuild) { - console.info("=> Build:", argv.onBuild); - spawnSync(argv.onBuild, {shell: true, stdio: "inherit"}); - } - - // run shell command - if (argv.onChange) { - console.info("=> Run:", argv.onChange); - spawnSync(argv.onChange, {shell: true, stdio: "inherit"}); - } - - // action invoke - if (argv.invokeParams || argv.invokeAction) { - let json = {}; - if (argv.invokeParams) { - if (argv.invokeParams.trim().startsWith("{")) { - json = JSON.parse(argv.invokeParams); - } else { - json = JSON.parse(fs.readFileSync(argv.invokeParams, {encoding: 'utf8'})); - } - } - const action = argv.invokeAction || argv.action; - wsk.actions.invoke({ - name: action, - params: json - }).then(response => { - console.info(`=> Invoked action ${action} with params ${argv.invokeParams}: ${response.activationId}`); - }).catch(err => { - console.error("Error invoking action:", err); - }); - } - - return result; - } catch (e) { - console.error(e); - } - }; - - if (this.argv.livereload) { - console.info(`LiveReload enabled for ${watch} on port ${this.liveReloadServer.config.port}`); - } - } - } - // ------------------------------------------------< utils >----------------- async tryCatch(task, message="Error during shutdown:") { diff --git a/src/ngrok.js b/src/ngrok.js new file mode 100644 index 0000000..afa01d7 --- /dev/null +++ b/src/ngrok.js @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const fs = require('fs-extra'); +const http = require('http'); +const ngrok = require('ngrok'); +const url = require('url'); +const util = require('util'); +const crypto = require("crypto"); + +class NgrokAgent { + constructor(argv) { + this.argv = argv; + } + + async getAgent(action) { + if (this.argv.verbose) { + console.log("Setting up ngrok", this.argv.ngrokRegion ? `(region: ${this.argv.ngrokRegion})` : ""); + } + + // 1. start local server on random port + this.ngrokServer = http.createServer(this.ngrokHandler.bind(this)); + // turn server.listen() into promise so we can await + const listen = util.promisify( this.ngrokServer.listen.bind(this.ngrokServer) ); + await listen(0, '127.0.0.1'); + + // 2. start ngrok tunnel connected to that port + this.ngrokServerPort = this.ngrokServer.address().port; + + // create a unique authorization token that we check on our local instance later + // this adds extra protection on top of the uniquely generated ngrok subdomain (e.g. a01ae275.ngrok.io) + this.ngrokAuth = crypto.randomBytes(32).toString("hex"); + const ngrokUrl = await ngrok.connect({ + addr: this.ngrokServerPort, + region: this.argv.ngrokRegion + }); + + // 3. pass on public ngrok url to agent + action.parameters.push({ + key: "$ngrokUrl", + value: url.parse(ngrokUrl).host + }); + action.parameters.push({ + key: "$ngrokAuth", + value: this.ngrokAuth + }); + + console.log(`Ngrok forwarding: ${ngrokUrl} => http://localhost:${this.ngrokServerPort} (auth: ${this.ngrokAuth})`); + + return fs.readFileSync(`${__dirname}/../agent/agent-ngrok.js`, {encoding: 'utf8'}); + } + + async stop() { + try { + if (this.ngrokServer) { + this.ngrokServer.close(); + this.ngrokServer = null; + } + } finally { + await ngrok.kill(); + } + } + + // local http server retrieving forwards from the ngrok agent, running them + // as a blocking local invocation and then returning the activation result back + ngrokHandler(req, res) { + // check authorization against our unique token + const authHeader = req.headers.authorization; + if (authHeader !== this.ngrokAuth) { + res.statusCode = 401; + res.end(); + return; + } + + if (req.method === 'POST') { + // agent POSTs arguments as json body + let body = ''; + // collect full request body first + req.on('data', chunk => { + body += chunk.toString(); + }); + req.on('end', async () => { + try { + const params = JSON.parse(body); + const id = params.$activationId; + delete params.$activationId; + + if (this.argv.verbose) { + console.log(); + console.info(`Activation: ${id}`); + console.log(params); + } else { + console.info(`Activation: ${id}`); + } + + const startTime = Date.now(); + + const result = await this.invoker.run(params, id); + + const duration = Date.now() - startTime; + console.info(`Completed activation ${id} in ${duration/1000.0} sec`); + if (this.argv.verbose) { + console.log(result); + } + + res.statusCode = 200; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify(result)); + + } catch (e) { + console.error(e); + res.statusCode = 400; + res.end(); + } + }); + } else { + res.statusCode = 404; + res.end(); + } + } +} + +module.exports = NgrokAgent; \ No newline at end of file diff --git a/src/watcher.js b/src/watcher.js new file mode 100644 index 0000000..d69b554 --- /dev/null +++ b/src/watcher.js @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const fs = require('fs-extra'); +const livereload = require('livereload'); +const { spawnSync } = require('child_process'); + +class Watcher { + constructor(argv, wsk) { + this.argv = argv; + this.wsk = wsk; + } + + async start() { + const watch = this.argv.watch || process.cwd(); + if (watch && + // each of these triggers listening + ( this.argv.livereload + || this.argv.onBuild + || this.argv.onChange + || this.argv.invokeParams + || this.argv.invokeAction ) + ) { + this.liveReloadServer = livereload.createServer({ + port: this.argv.livereloadPort, + noListen: !this.argv.livereload, + exclusions: [this.argv.buildPath, "node_modules/**"], + exts: this.argv.watchExts || ["json", "js", "ts", "coffee", "py", "rb", "erb", "go", "java", "scala", "php", "swift", "rs", "cs", "bal", "php", "php5"], + extraExts: [] + }); + this.liveReloadServer.watch(watch); + + // overwrite function to get notified on changes + const refresh = this.liveReloadServer.refresh; + const argv = this.argv; + const wsk = this.wsk; + this.liveReloadServer.refresh = function(filepath) { + try { + let result = []; + + if (argv.verbose) { + console.log("File modified:", filepath); + } + + // call original function if we are listening + if (argv.livereload) { + result = refresh.call(this, filepath); + } + + // run build command before invoke triggers below + if (argv.onBuild) { + console.info("=> Build:", argv.onBuild); + spawnSync(argv.onBuild, {shell: true, stdio: "inherit"}); + } + + // run shell command + if (argv.onChange) { + console.info("=> Run:", argv.onChange); + spawnSync(argv.onChange, {shell: true, stdio: "inherit"}); + } + + // action invoke + if (argv.invokeParams || argv.invokeAction) { + let json = {}; + if (argv.invokeParams) { + if (argv.invokeParams.trim().startsWith("{")) { + json = JSON.parse(argv.invokeParams); + } else { + json = JSON.parse(fs.readFileSync(argv.invokeParams, {encoding: 'utf8'})); + } + } + const action = argv.invokeAction || argv.action; + wsk.actions.invoke({ + name: action, + params: json + }).then(response => { + console.info(`=> Invoked action ${action} with params ${argv.invokeParams}: ${response.activationId}`); + }).catch(err => { + console.error("Error invoking action:", err); + }); + } + + return result; + } catch (e) { + console.error(e); + } + }; + + if (this.argv.livereload) { + console.info(`LiveReload enabled for ${watch} on port ${this.liveReloadServer.config.port}`); + } + } + } + + async stop() { + if (this.liveReloadServer) { + if (this.liveReloadServer.server) { + this.liveReloadServer.close(); + } else { + this.liveReloadServer.watcher.close(); + } + this.liveReloadServer = null; + } + } +} + +module.exports = Watcher; \ No newline at end of file
