csantanapr closed pull request #161: Add health monitoring support
URL: https://github.com/apache/incubator-openwhisk-package-cloudant/pull/161
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/provider/app.js b/provider/app.js
index 2deee1d..0bfd33d 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -25,7 +25,6 @@ app.set('port', process.env.PORT || 8080);
// Allow invoking servers with self-signed certificates.
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
-
// If it does not already exist, create the triggers database. This is the
database that will
// store the managed triggers.
var dbUsername = process.env.DB_USERNAME;
@@ -35,6 +34,8 @@ var dbProtocol = process.env.DB_PROTOCOL;
var dbPrefix = process.env.DB_PREFIX;
var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
var redisUrl = process.env.REDIS_URL;
+var monitoringAuth = process.env.MONITORING_AUTH;
+var monitoringInterval = process.env.MONITORING_INTERVAL;
var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
@@ -74,7 +75,7 @@ function createDatabase() {
};
createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
- .then((db) => {
+ .then(db => {
var filterDD = {
filters: {
triggers_by_worker:
@@ -86,6 +87,22 @@ function createDatabase() {
};
return createDesignDoc(db, filterDDName, filterDD);
})
+ .then(db => {
+ if (monitoringAuth) {
+ var filterDD = {
+ filters: {
+ canary_docs:
+ function (doc, req) {
+ return doc.isCanaryDoc && doc.host ===
req.query.host;
+ }.toString()
+ }
+ };
+ return createDesignDoc(db, '_design/' +
constants.MONITOR_DESIGN_DOC, filterDD);
+ }
+ else {
+ return Promise.resolve(db);
+ }
+ })
.then((db) => {
resolve(db);
})
@@ -146,7 +163,6 @@ function createRedisClient() {
client = redis.createClient(redisUrl);
}
-
client.on('connect', function () {
resolve(client);
});
@@ -187,7 +203,7 @@ function init(server) {
})
.then(() => {
var providerRAS = new ProviderRAS();
- var providerHealth = new ProviderHealth(providerUtils);
+ var providerHealth = new ProviderHealth(logger, providerUtils);
var providerActivation = new ProviderActivation(logger, providerUtils);
// RAS Endpoint
@@ -200,6 +216,12 @@ function init(server) {
app.get(providerActivation.endPoint, providerUtils.authorize,
providerActivation.active);
providerUtils.initAllTriggers();
+
+ if (monitoringAuth) {
+ setInterval(function () {
+ providerHealth.monitor(monitoringAuth);
+ }, monitoringInterval || constants.MONITOR_INTERVAL);
+ }
})
.catch(err => {
logger.error(method, 'an error occurred creating database:', err);
diff --git a/provider/lib/active.js b/provider/lib/active.js
index 66efdcd..9f8a0fc 100644
--- a/provider/lib/active.js
+++ b/provider/lib/active.js
@@ -3,13 +3,15 @@ module.exports = function(logger, utils) {
// Active Endpoint
this.endPoint = '/active';
+ var hostMachine = process.env.HOST_MACHINE;
+
this.active = function(req, res) {
var method = 'active';
var response = {
worker: utils.worker,
host: utils.host,
- hostMachine: utils.hostMachine,
+ hostMachine: hostMachine,
active: utils.host === utils.activeHost
};
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 0203027..539fd99 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -5,7 +5,10 @@ const RETRY_DELAY = 1000; //in milliseconds
const REDIS_KEY = 'active';
const FILTERS_DESIGN_DOC = 'triggerFilters';
const VIEWS_DESIGN_DOC = 'triggerViews';
+const MONITOR_DESIGN_DOC = 'monitorFilters';
const TRIGGERS_BY_WORKER = 'triggers_by_worker';
+const DOCS_FOR_MONITOR = 'canary_docs';
+const MONITOR_INTERVAL = 5 * 1000 * 60; //in milliseconds
module.exports = {
@@ -16,5 +19,8 @@ module.exports = {
REDIS_KEY: REDIS_KEY,
FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
- TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
+ TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER,
+ MONITOR_INTERVAL: MONITOR_INTERVAL,
+ MONITOR_DESIGN_DOC: MONITOR_DESIGN_DOC,
+ DOCS_FOR_MONITOR: DOCS_FOR_MONITOR
};
diff --git a/provider/lib/health.js b/provider/lib/health.js
index bbdc01d..eb2832d 100644
--- a/provider/lib/health.js
+++ b/provider/lib/health.js
@@ -1,11 +1,21 @@
var si = require('systeminformation');
var v8 = require('v8');
+var request = require('request');
+var _ = require('lodash');
+var URL = require('url').URL;
+var constants = require('./constants.js');
-module.exports = function(utils) {
+module.exports = function(logger, utils) {
// Health Endpoint
this.endPoint = '/health';
+ var triggerName;
+ var canaryDocID;
+ var monitorStatus;
+ var monitorStages = ['triggerStarted', 'triggerFired', 'triggerStopped'];
+ var healthMonitor = this;
+
// Health Logic
this.health = function (req, res) {
@@ -20,13 +30,13 @@ module.exports = function(utils) {
si.inetLatency(utils.routerHost)
])
.then(results => {
+ stats.triggerMonitor = monitorStatus;
stats.memory = results[0];
- stats.cpu = results[1];
+ stats.cpu = _.omit(results[1], 'cpus');
stats.disk = results[2];
stats.network = results[3];
stats.apiHostLatency = results[4];
stats.heapStatistics = v8.getHeapStatistics();
- stats.heapSpaceStatistics =v8.getHeapSpaceStatistics();
res.send(stats);
})
.catch(error => {
@@ -35,4 +45,182 @@ module.exports = function(utils) {
});
};
+ this.monitor = function(apikey) {
+ var method = 'monitor';
+
+ var auth = apikey.split(':');
+
+ if (triggerName) {
+ monitorStatus = Object.assign({}, utils.monitorStatus);
+ utils.monitorStatus = {};
+
+ var monitorStatusSize = Object.keys(monitorStatus).length;
+ if (monitorStatusSize < 5) {
+ //we have a failure in one of the stages
+ var stageFailed = monitorStages[monitorStatusSize - 2];
+ monitorStatus[stageFailed] = 'failed';
+ }
+ var existingTriggerID = `:_:${triggerName}`;
+ var existingCanaryID = canaryDocID;
+
+ //delete trigger feed from database
+ healthMonitor.deleteDocFromDB(existingTriggerID, 0);
+
+ //delete the trigger
+ var uri = utils.uriHost + '/api/v1/namespaces/_/triggers/' +
triggerName;
+ healthMonitor.deleteTrigger(existingTriggerID, uri, auth, 0);
+
+ //delete the canary doc
+ healthMonitor.deleteDocFromDB(existingCanaryID, 0);
+ }
+
+ //create new cloudant trigger and canary doc
+ var docSuffix = utils.worker + utils.host + '_' + Date.now();
+ triggerName = 'cloudant_' + docSuffix;
+ canaryDocID = 'canary_' + docSuffix;
+
+ //update status monitor object
+ utils.monitorStatus.triggerName = triggerName;
+ utils.monitorStatus.triggerType = 'changes';
+
+ var triggerURL = utils.uriHost + '/api/v1/namespaces/_/triggers/' +
triggerName;
+ var triggerID = `:_:${triggerName}`;
+ healthMonitor.createTrigger(triggerURL, auth)
+ .then(info => {
+ logger.info(method, triggerID, info);
+ var newTrigger = healthMonitor.createCloudantTrigger(triggerID,
apikey);
+ healthMonitor.createDocInDB(triggerID, newTrigger);
+ })
+ .catch(err => {
+ logger.error(method, triggerID, err);
+ });
+ };
+
+ this.createCloudantTrigger = function(triggerID, apikey) {
+ var method = 'createCloudantTrigger';
+
+ var dbURL = new URL(utils.db.config.url);
+ var dbName = utils.db.config.db;
+
+ var newTrigger = {
+ apikey: apikey,
+ id: triggerID,
+ host: dbURL.hostname,
+ port: dbURL.port,
+ protocol: dbURL.protocol.replace(':', ''),
+ dbname: dbName,
+ user: dbURL.username,
+ pass: dbURL.password,
+ filter: constants.MONITOR_DESIGN_DOC + '/' +
constants.DOCS_FOR_MONITOR,
+ query_params: {host: utils.host},
+ maxTriggers: 1,
+ worker: utils.worker,
+ monitor: utils.host
+ };
+
+ return newTrigger;
+ };
+
+ this.createTrigger = function(triggerURL, auth) {
+ var method = 'createTrigger';
+
+ return new Promise(function(resolve, reject) {
+ request({
+ method: 'put',
+ uri: triggerURL,
+ auth: {
+ user: auth[0],
+ pass: auth[1]
+ },
+ json: true,
+ body: {}
+ }, function (error, response) {
+ if (error || response.statusCode >= 400) {
+ reject('monitoring trigger create request failed');
+ }
+ else {
+ resolve('monitoring trigger create request was
successful');
+ }
+ });
+ });
+ };
+
+ this.createDocInDB = function(docID, doc) {
+ var method = 'createDocInDB';
+
+ utils.db.insert(doc, docID, function (err) {
+ if (!err) {
+ logger.info(method, docID, 'was successfully inserted');
+ if (doc.monitor) {
+ setTimeout(function () {
+ var canaryDoc = {
+ isCanaryDoc: true,
+ host: utils.host
+ };
+ healthMonitor.createDocInDB(canaryDocID, canaryDoc);
+ }, 1000 * 60);
+ }
+ }
+ else {
+ logger.error(method, docID, err);
+ }
+ });
+ };
+
+ this.deleteTrigger = function(triggerID, uri, auth, retryCount) {
+ var method = 'deleteTrigger';
+
+ request({
+ method: 'delete',
+ uri: uri,
+ auth: {
+ user: auth[0],
+ pass: auth[1]
+ },
+ }, function (error, response) {
+ logger.info(method, triggerID, 'http delete request, STATUS:',
response ? response.statusCode : undefined);
+ if (error || response.statusCode >= 400) {
+ if (!error && response.statusCode === 409 && retryCount < 5) {
+ logger.info(method, 'attempting to delete trigger again',
triggerID, 'Retry Count:', (retryCount + 1));
+ setTimeout(function () {
+ healthMonitor.deleteTrigger(triggerID, uri, auth,
(retryCount + 1));
+ }, 1000);
+ } else {
+ logger.error(method, triggerID, 'trigger delete request
failed');
+ }
+ }
+ else {
+ logger.info(method, triggerID, 'trigger delete request was
successful');
+ }
+ });
+ };
+
+ this.deleteDocFromDB = function(docID, retryCount) {
+ var method = 'deleteDocFromDB';
+
+ //delete from database
+ utils.db.get(docID, function (err, existing) {
+ if (!err) {
+ utils.db.destroy(existing._id, existing._rev, function (err) {
+ if (err) {
+ if (err.statusCode === 409 && retryCount < 5) {
+ setTimeout(function () {
+ healthMonitor.deleteDocFromDB(docID,
(retryCount + 1));
+ }, 1000);
+ }
+ else {
+ logger.error(method, docID, 'could not be deleted
from the database');
+ }
+ }
+ else {
+ logger.info(method, docID, 'was successfully deleted
from the database');
+ }
+ });
+ }
+ else {
+ logger.error(method, docID, 'could not be found in the
database');
+ }
+ });
+ };
+
};
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 8446117..7906c36 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -1,25 +1,21 @@
-var _ = require('lodash');
var request = require('request');
var HttpStatus = require('http-status-codes');
var constants = require('./constants.js');
+module.exports = function(logger, triggerDB, redisClient) {
-module.exports = function(
- logger,
- triggerDB,
- redisClient
-) {
- this.module = 'utils';
this.triggers = {};
this.endpointAuth = process.env.ENDPOINT_AUTH;
this.routerHost = process.env.ROUTER_HOST || 'localhost';
this.worker = process.env.WORKER || 'worker0';
this.host = process.env.HOST_INDEX || 'host0';
- this.hostMachine = process.env.HOST_MACHINE;
this.activeHost = 'host0'; //default value on init (will be updated for
existing redis)
+ this.db = triggerDB;
this.redisClient = redisClient;
- this.redisHash = triggerDB.config.db + '_' + this.worker;
+ this.redisHash = this.db.config.db + '_' + this.worker;
this.redisKey = constants.REDIS_KEY;
+ this.uriHost ='https://' + this.routerHost + ':443';
+ this.monitorStatus = {};
var retryAttempts = constants.RETRY_ATTEMPTS;
var filterDDName = constants.FILTERS_DESIGN_DOC;
@@ -56,16 +52,13 @@ module.exports = function(
utils.triggers[dataTrigger.id] = dataTrigger;
feed.on('change', function (change) {
- if (utils.activeHost === utils.host) {
+ var triggerHandle = utils.triggers[dataTrigger.id];
+ if (triggerHandle && utils.shouldFireTrigger(triggerHandle) &&
utils.hasTriggersRemaining(triggerHandle)) {
logger.info(method, 'Trigger', dataTrigger.id, 'got change
from', dataTrigger.dbname);
-
- var triggerHandle = utils.triggers[dataTrigger.id];
- if (triggerHandle && (triggerHandle.maxTriggers === -1 ||
triggerHandle.triggersLeft > 0)) {
- try {
- utils.fireTrigger(dataTrigger.id, change);
- } catch (e) {
- logger.error(method, 'Exception occurred while
firing trigger', dataTrigger.id, e);
- }
+ try {
+ utils.fireTrigger(dataTrigger.id, change);
+ } catch (e) {
+ logger.error(method, 'Exception occurred while firing
trigger', dataTrigger.id, e);
}
}
});
@@ -73,17 +66,15 @@ module.exports = function(
feed.follow();
return new Promise(function(resolve, reject) {
-
feed.on('error', function (err) {
logger.error(method,'Error occurred for trigger',
dataTrigger.id, '(db ' + dataTrigger.dbname + '):', err);
reject(err);
});
- feed.on('confirm', function (dbObj) {
+ feed.on('confirm', function () {
logger.info(method, 'Added cloudant data trigger',
dataTrigger.id, 'listening for changes in database', dataTrigger.dbname);
resolve(dataTrigger.id);
});
-
});
} catch (err) {
@@ -94,10 +85,6 @@ module.exports = function(
};
this.initTrigger = function(newTrigger) {
- var method = 'initTrigger';
-
- logger.info(method, 'create trigger', newTrigger.id, 'with the
following args', newTrigger);
-
var maxTriggers = newTrigger.maxTriggers ||
constants.DEFAULT_MAX_TRIGGERS;
var trigger = {
@@ -113,7 +100,8 @@ module.exports = function(
maxTriggers: maxTriggers,
triggersLeft: maxTriggers,
filter: newTrigger.filter,
- query_params: newTrigger.query_params
+ query_params: newTrigger.query_params,
+ monitor: newTrigger.monitor
};
return trigger;
@@ -124,6 +112,18 @@ module.exports = function(
[HttpStatus.REQUEST_TIMEOUT,
HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
};
+ this.shouldFireTrigger = function(trigger) {
+ return trigger.monitor || utils.activeHost === utils.host;
+ };
+
+ this.hasTriggersRemaining = function(trigger) {
+ return !trigger.maxTriggers || trigger.maxTriggers === -1 ||
trigger.triggersLeft > 0;
+ };
+
+ this.isMonitoringTrigger = function(monitor, triggerIdentifier) {
+ return monitor && utils.monitorStatus.triggerName ===
utils.parseQName(triggerIdentifier).name;
+ };
+
this.disableTrigger = function(id, statusCode, message) {
var method = 'disableTrigger';
@@ -188,9 +188,12 @@ module.exports = function(
utils.postTrigger(dataTrigger, form, uri, auth, 0)
.then(triggerId => {
logger.info(method, 'Trigger', triggerId, 'was successfully
fired');
+ if (utils.isMonitoringTrigger(dataTrigger.monitor, triggerId)) {
+ utils.monitorStatus.triggerFired = "success";
+ }
if (dataTrigger.triggersLeft === 0) {
- utils.disableTrigger(dataTrigger.id, undefined, 'Automatically
disabled after reaching max triggers');
- logger.warn(method, 'no more triggers left, disabled',
dataTrigger.id);
+ utils.disableTrigger(triggerId, undefined, 'Automatically
disabled after reaching max triggers');
+ logger.warn(method, 'no more triggers left, disabled',
triggerId);
}
})
.catch(err => {
@@ -273,7 +276,7 @@ module.exports = function(
var triggerIdentifier = trigger.id;
var doc = trigger.doc;
- if (!(triggerIdentifier in utils.triggers)) {
+ if (!(triggerIdentifier in utils.triggers) &&
!doc.monitor) {
//check if trigger still exists in whisk db
var triggerObj = utils.parseQName(triggerIdentifier);
var host = 'https://' + utils.routerHost + ':' + 443;
@@ -330,19 +333,23 @@ module.exports = function(
var triggerIdentifier = change.id;
var doc = change.doc;
- logger.info(method, 'got change for trigger',
triggerIdentifier);
-
if (utils.triggers[triggerIdentifier]) {
if (doc.status && doc.status.active === false) {
utils.deleteTrigger(triggerIdentifier);
+ if (utils.isMonitoringTrigger(doc.monitor,
triggerIdentifier)) {
+ utils.monitorStatus.triggerStopped = "success";
+ }
}
}
else {
//ignore changes to disabled triggers
- if (!doc.status || doc.status.active === true) {
+ if ((!doc.status || doc.status.active === true) &&
(!doc.monitor || doc.monitor === utils.host)) {
utils.createTrigger(utils.initTrigger(doc))
.then(triggerIdentifier => {
logger.info(method, triggerIdentifier, 'created
successfully');
+ if (utils.isMonitoringTrigger(doc.monitor,
triggerIdentifier)) {
+ utils.monitorStatus.triggerStarted = "success";
+ }
})
.catch(err => {
var message = 'Automatically disabled after
receiving exception on create trigger: ' + err;
@@ -368,7 +375,6 @@ module.exports = function(
var method = 'authorize';
if (utils.endpointAuth) {
-
if (!req.headers.authorization) {
res.set('www-authenticate', 'Basic realm="Private"');
res.status(HttpStatus.UNAUTHORIZED);
@@ -388,9 +394,7 @@ module.exports = function(
var uuid = auth[1];
var key = auth[2];
-
var endpointAuth = utils.endpointAuth.split(':');
-
if (endpointAuth[0] === uuid && endpointAuth[1] === key) {
next();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services