csantanapr closed pull request #161: Add health monitoring support
URL: https://github.com/apache/incubator-openwhisk-package-cloudant/pull/161
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/provider/app.js b/provider/app.js
index 2deee1d..0bfd33d 100644
--- a/provider/app.js
+++ b/provider/app.js
@@ -25,7 +25,6 @@ app.set('port', process.env.PORT || 8080);
 // Allow invoking servers with self-signed certificates.
 process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
 
-
 // If it does not already exist, create the triggers database.  This is the 
database that will
 // store the managed triggers.
 var dbUsername = process.env.DB_USERNAME;
@@ -35,6 +34,8 @@ var dbProtocol = process.env.DB_PROTOCOL;
 var dbPrefix = process.env.DB_PREFIX;
 var databaseName = dbPrefix + constants.TRIGGER_DB_SUFFIX;
 var redisUrl = process.env.REDIS_URL;
+var monitoringAuth = process.env.MONITORING_AUTH;
+var monitoringInterval = process.env.MONITORING_INTERVAL;
 var filterDDName = '_design/' + constants.FILTERS_DESIGN_DOC;
 var viewDDName = '_design/' + constants.VIEWS_DESIGN_DOC;
 
@@ -74,7 +75,7 @@ function createDatabase() {
                 };
 
                 createDesignDoc(nano.db.use(databaseName), viewDDName, viewDD)
-                .then((db) => {
+                .then(db => {
                     var filterDD = {
                         filters: {
                             triggers_by_worker:
@@ -86,6 +87,22 @@ function createDatabase() {
                     };
                     return createDesignDoc(db, filterDDName, filterDD);
                 })
+                .then(db => {
+                    if (monitoringAuth) {
+                        var filterDD = {
+                            filters: {
+                                canary_docs:
+                                    function (doc, req) {
+                                        return doc.isCanaryDoc && doc.host === 
req.query.host;
+                                    }.toString()
+                            }
+                        };
+                        return createDesignDoc(db, '_design/' + 
constants.MONITOR_DESIGN_DOC, filterDD);
+                    }
+                    else {
+                        return Promise.resolve(db);
+                    }
+                })
                 .then((db) => {
                     resolve(db);
                 })
@@ -146,7 +163,6 @@ function createRedisClient() {
                 client = redis.createClient(redisUrl);
             }
 
-
             client.on('connect', function () {
                 resolve(client);
             });
@@ -187,7 +203,7 @@ function init(server) {
     })
     .then(() => {
         var providerRAS = new ProviderRAS();
-        var providerHealth = new ProviderHealth(providerUtils);
+        var providerHealth = new ProviderHealth(logger, providerUtils);
         var providerActivation = new ProviderActivation(logger, providerUtils);
 
         // RAS Endpoint
@@ -200,6 +216,12 @@ function init(server) {
         app.get(providerActivation.endPoint, providerUtils.authorize, 
providerActivation.active);
 
         providerUtils.initAllTriggers();
+
+        if (monitoringAuth) {
+            setInterval(function () {
+                providerHealth.monitor(monitoringAuth);
+            }, monitoringInterval || constants.MONITOR_INTERVAL);
+        }
     })
     .catch(err => {
         logger.error(method, 'an error occurred creating database:', err);
diff --git a/provider/lib/active.js b/provider/lib/active.js
index 66efdcd..9f8a0fc 100644
--- a/provider/lib/active.js
+++ b/provider/lib/active.js
@@ -3,13 +3,15 @@ module.exports = function(logger, utils) {
     // Active Endpoint
     this.endPoint = '/active';
 
+    var hostMachine = process.env.HOST_MACHINE;
+
     this.active = function(req, res) {
         var method = 'active';
 
         var response = {
             worker: utils.worker,
             host: utils.host,
-            hostMachine: utils.hostMachine,
+            hostMachine: hostMachine,
             active: utils.host === utils.activeHost
         };
 
diff --git a/provider/lib/constants.js b/provider/lib/constants.js
index 0203027..539fd99 100644
--- a/provider/lib/constants.js
+++ b/provider/lib/constants.js
@@ -5,7 +5,10 @@ const RETRY_DELAY = 1000; //in milliseconds
 const REDIS_KEY = 'active';
 const FILTERS_DESIGN_DOC = 'triggerFilters';
 const VIEWS_DESIGN_DOC = 'triggerViews';
+const MONITOR_DESIGN_DOC = 'monitorFilters';
 const TRIGGERS_BY_WORKER = 'triggers_by_worker';
+const DOCS_FOR_MONITOR = 'canary_docs';
+const MONITOR_INTERVAL = 5 * 1000 * 60; //in milliseconds
 
 
 module.exports = {
@@ -16,5 +19,8 @@ module.exports = {
     REDIS_KEY: REDIS_KEY,
     FILTERS_DESIGN_DOC: FILTERS_DESIGN_DOC,
     VIEWS_DESIGN_DOC: VIEWS_DESIGN_DOC,
-    TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER
+    TRIGGERS_BY_WORKER: TRIGGERS_BY_WORKER,
+    MONITOR_INTERVAL: MONITOR_INTERVAL,
+    MONITOR_DESIGN_DOC: MONITOR_DESIGN_DOC,
+    DOCS_FOR_MONITOR: DOCS_FOR_MONITOR
 };
diff --git a/provider/lib/health.js b/provider/lib/health.js
index bbdc01d..eb2832d 100644
--- a/provider/lib/health.js
+++ b/provider/lib/health.js
@@ -1,11 +1,21 @@
 var si = require('systeminformation');
 var v8 = require('v8');
+var request = require('request');
+var _ = require('lodash');
+var URL = require('url').URL;
+var constants = require('./constants.js');
 
-module.exports = function(utils) {
+module.exports = function(logger, utils) {
 
     // Health Endpoint
     this.endPoint = '/health';
 
+    var triggerName;
+    var canaryDocID;
+    var monitorStatus;
+    var monitorStages = ['triggerStarted', 'triggerFired', 'triggerStopped'];
+    var healthMonitor = this;
+
     // Health Logic
     this.health = function (req, res) {
 
@@ -20,13 +30,13 @@ module.exports = function(utils) {
             si.inetLatency(utils.routerHost)
         ])
         .then(results => {
+            stats.triggerMonitor = monitorStatus;
             stats.memory = results[0];
-            stats.cpu = results[1];
+            stats.cpu = _.omit(results[1], 'cpus');
             stats.disk = results[2];
             stats.network = results[3];
             stats.apiHostLatency = results[4];
             stats.heapStatistics = v8.getHeapStatistics();
-            stats.heapSpaceStatistics =v8.getHeapSpaceStatistics();
             res.send(stats);
         })
         .catch(error => {
@@ -35,4 +45,182 @@ module.exports = function(utils) {
         });
     };
 
+    this.monitor = function(apikey) {
+        var method = 'monitor';
+
+        var auth = apikey.split(':');
+
+        if (triggerName) {
+            monitorStatus = Object.assign({}, utils.monitorStatus);
+            utils.monitorStatus = {};
+
+            var monitorStatusSize = Object.keys(monitorStatus).length;
+            if (monitorStatusSize < 5) {
+                //we have a failure in one of the stages
+                var stageFailed = monitorStages[monitorStatusSize - 2];
+                monitorStatus[stageFailed] = 'failed';
+            }
+            var existingTriggerID = `:_:${triggerName}`;
+            var existingCanaryID = canaryDocID;
+
+            //delete trigger feed from database
+            healthMonitor.deleteDocFromDB(existingTriggerID, 0);
+
+            //delete the trigger
+            var uri = utils.uriHost + '/api/v1/namespaces/_/triggers/' + 
triggerName;
+            healthMonitor.deleteTrigger(existingTriggerID, uri, auth, 0);
+
+            //delete the canary doc
+            healthMonitor.deleteDocFromDB(existingCanaryID, 0);
+        }
+
+        //create new cloudant trigger and canary doc
+        var docSuffix = utils.worker + utils.host + '_' + Date.now();
+        triggerName = 'cloudant_' + docSuffix;
+        canaryDocID = 'canary_' + docSuffix;
+
+        //update status monitor object
+        utils.monitorStatus.triggerName = triggerName;
+        utils.monitorStatus.triggerType = 'changes';
+
+        var triggerURL = utils.uriHost + '/api/v1/namespaces/_/triggers/' + 
triggerName;
+        var triggerID = `:_:${triggerName}`;
+        healthMonitor.createTrigger(triggerURL, auth)
+        .then(info => {
+            logger.info(method, triggerID, info);
+            var newTrigger = healthMonitor.createCloudantTrigger(triggerID, 
apikey);
+            healthMonitor.createDocInDB(triggerID, newTrigger);
+        })
+        .catch(err => {
+            logger.error(method, triggerID, err);
+        });
+    };
+
+    this.createCloudantTrigger = function(triggerID, apikey) {
+        var method = 'createCloudantTrigger';
+
+        var dbURL = new URL(utils.db.config.url);
+        var dbName = utils.db.config.db;
+
+        var newTrigger = {
+            apikey: apikey,
+            id: triggerID,
+            host: dbURL.hostname,
+            port: dbURL.port,
+            protocol: dbURL.protocol.replace(':', ''),
+            dbname: dbName,
+            user: dbURL.username,
+            pass: dbURL.password,
+            filter: constants.MONITOR_DESIGN_DOC + '/' + 
constants.DOCS_FOR_MONITOR,
+            query_params: {host: utils.host},
+            maxTriggers: 1,
+            worker: utils.worker,
+            monitor: utils.host
+        };
+
+        return newTrigger;
+    };
+
+    this.createTrigger = function(triggerURL, auth) {
+        var method = 'createTrigger';
+
+        return new Promise(function(resolve, reject) {
+            request({
+                method: 'put',
+                uri: triggerURL,
+                auth: {
+                    user: auth[0],
+                    pass: auth[1]
+                },
+                json: true,
+                body: {}
+            }, function (error, response) {
+                if (error || response.statusCode >= 400) {
+                    reject('monitoring trigger create request failed');
+                }
+                else {
+                    resolve('monitoring trigger create request was 
successful');
+                }
+            });
+        });
+    };
+
+    this.createDocInDB = function(docID, doc) {
+        var method = 'createDocInDB';
+
+        utils.db.insert(doc, docID, function (err) {
+            if (!err) {
+                logger.info(method, docID, 'was successfully inserted');
+                if (doc.monitor) {
+                    setTimeout(function () {
+                        var canaryDoc = {
+                            isCanaryDoc: true,
+                            host: utils.host
+                        };
+                        healthMonitor.createDocInDB(canaryDocID, canaryDoc);
+                    }, 1000 * 60);
+                }
+            }
+            else {
+                logger.error(method, docID, err);
+            }
+        });
+    };
+
+    this.deleteTrigger = function(triggerID, uri, auth, retryCount) {
+        var method = 'deleteTrigger';
+
+        request({
+            method: 'delete',
+            uri: uri,
+            auth: {
+                user: auth[0],
+                pass: auth[1]
+            },
+        }, function (error, response) {
+            logger.info(method, triggerID, 'http delete request, STATUS:', 
response ? response.statusCode : undefined);
+            if (error || response.statusCode >= 400) {
+                if (!error && response.statusCode === 409 && retryCount < 5) {
+                    logger.info(method, 'attempting to delete trigger again', 
triggerID, 'Retry Count:', (retryCount + 1));
+                    setTimeout(function () {
+                        healthMonitor.deleteTrigger(triggerID, uri, auth, 
(retryCount + 1));
+                    }, 1000);
+                } else {
+                    logger.error(method, triggerID, 'trigger delete request 
failed');
+                }
+            }
+            else {
+                logger.info(method, triggerID, 'trigger delete request was 
successful');
+            }
+        });
+    };
+
+    this.deleteDocFromDB = function(docID, retryCount) {
+        var method = 'deleteDocFromDB';
+
+        //delete from database
+        utils.db.get(docID, function (err, existing) {
+            if (!err) {
+                utils.db.destroy(existing._id, existing._rev, function (err) {
+                    if (err) {
+                        if (err.statusCode === 409 && retryCount < 5) {
+                            setTimeout(function () {
+                                healthMonitor.deleteDocFromDB(docID, 
(retryCount + 1));
+                            }, 1000);
+                        }
+                        else {
+                            logger.error(method, docID, 'could not be deleted 
from the database');
+                        }
+                    }
+                    else {
+                        logger.info(method, docID, 'was successfully deleted 
from the database');
+                    }
+                });
+            }
+            else {
+                logger.error(method, docID, 'could not be found in the 
database');
+            }
+        });
+    };
+
 };
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 8446117..7906c36 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -1,25 +1,21 @@
-var _ = require('lodash');
 var request = require('request');
 var HttpStatus = require('http-status-codes');
 var constants = require('./constants.js');
 
+module.exports = function(logger, triggerDB, redisClient) {
 
-module.exports = function(
-  logger,
-  triggerDB,
-  redisClient
-) {
-    this.module = 'utils';
     this.triggers = {};
     this.endpointAuth = process.env.ENDPOINT_AUTH;
     this.routerHost = process.env.ROUTER_HOST || 'localhost';
     this.worker = process.env.WORKER || 'worker0';
     this.host = process.env.HOST_INDEX || 'host0';
-    this.hostMachine = process.env.HOST_MACHINE;
     this.activeHost = 'host0'; //default value on init (will be updated for 
existing redis)
+    this.db = triggerDB;
     this.redisClient = redisClient;
-    this.redisHash = triggerDB.config.db + '_' + this.worker;
+    this.redisHash = this.db.config.db + '_' + this.worker;
     this.redisKey = constants.REDIS_KEY;
+    this.uriHost ='https://' + this.routerHost + ':443';
+    this.monitorStatus = {};
 
     var retryAttempts = constants.RETRY_ATTEMPTS;
     var filterDDName = constants.FILTERS_DESIGN_DOC;
@@ -56,16 +52,13 @@ module.exports = function(
             utils.triggers[dataTrigger.id] = dataTrigger;
 
             feed.on('change', function (change) {
-                if (utils.activeHost === utils.host) {
+                var triggerHandle = utils.triggers[dataTrigger.id];
+                if (triggerHandle && utils.shouldFireTrigger(triggerHandle) && 
utils.hasTriggersRemaining(triggerHandle)) {
                     logger.info(method, 'Trigger', dataTrigger.id, 'got change 
from', dataTrigger.dbname);
-
-                    var triggerHandle = utils.triggers[dataTrigger.id];
-                    if (triggerHandle && (triggerHandle.maxTriggers === -1 || 
triggerHandle.triggersLeft > 0)) {
-                        try {
-                            utils.fireTrigger(dataTrigger.id, change);
-                        } catch (e) {
-                            logger.error(method, 'Exception occurred while 
firing trigger', dataTrigger.id, e);
-                        }
+                    try {
+                        utils.fireTrigger(dataTrigger.id, change);
+                    } catch (e) {
+                        logger.error(method, 'Exception occurred while firing 
trigger', dataTrigger.id, e);
                     }
                 }
             });
@@ -73,17 +66,15 @@ module.exports = function(
             feed.follow();
 
             return new Promise(function(resolve, reject) {
-
                 feed.on('error', function (err) {
                     logger.error(method,'Error occurred for trigger', 
dataTrigger.id, '(db ' + dataTrigger.dbname + '):', err);
                     reject(err);
                 });
 
-                feed.on('confirm', function (dbObj) {
+                feed.on('confirm', function () {
                     logger.info(method, 'Added cloudant data trigger', 
dataTrigger.id, 'listening for changes in database', dataTrigger.dbname);
                     resolve(dataTrigger.id);
                 });
-
             });
 
         } catch (err) {
@@ -94,10 +85,6 @@ module.exports = function(
     };
 
     this.initTrigger = function(newTrigger) {
-        var method = 'initTrigger';
-
-        logger.info(method, 'create trigger', newTrigger.id, 'with the 
following args', newTrigger);
-
         var maxTriggers = newTrigger.maxTriggers || 
constants.DEFAULT_MAX_TRIGGERS;
 
         var trigger = {
@@ -113,7 +100,8 @@ module.exports = function(
             maxTriggers: maxTriggers,
             triggersLeft: maxTriggers,
             filter: newTrigger.filter,
-            query_params: newTrigger.query_params
+            query_params: newTrigger.query_params,
+            monitor: newTrigger.monitor
         };
 
         return trigger;
@@ -124,6 +112,18 @@ module.exports = function(
             [HttpStatus.REQUEST_TIMEOUT, 
HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1);
     };
 
+    this.shouldFireTrigger = function(trigger) {
+        return trigger.monitor || utils.activeHost === utils.host;
+    };
+
+    this.hasTriggersRemaining = function(trigger) {
+        return !trigger.maxTriggers || trigger.maxTriggers === -1 || 
trigger.triggersLeft > 0;
+    };
+
+    this.isMonitoringTrigger = function(monitor, triggerIdentifier) {
+        return monitor && utils.monitorStatus.triggerName === 
utils.parseQName(triggerIdentifier).name;
+    };
+
     this.disableTrigger = function(id, statusCode, message) {
         var method = 'disableTrigger';
 
@@ -188,9 +188,12 @@ module.exports = function(
         utils.postTrigger(dataTrigger, form, uri, auth, 0)
         .then(triggerId => {
             logger.info(method, 'Trigger', triggerId, 'was successfully 
fired');
+            if (utils.isMonitoringTrigger(dataTrigger.monitor, triggerId)) {
+                utils.monitorStatus.triggerFired = "success";
+            }
             if (dataTrigger.triggersLeft === 0) {
-                utils.disableTrigger(dataTrigger.id, undefined, 'Automatically 
disabled after reaching max triggers');
-                logger.warn(method, 'no more triggers left, disabled', 
dataTrigger.id);
+                utils.disableTrigger(triggerId, undefined, 'Automatically 
disabled after reaching max triggers');
+                logger.warn(method, 'no more triggers left, disabled', 
triggerId);
             }
         })
         .catch(err => {
@@ -273,7 +276,7 @@ module.exports = function(
                     var triggerIdentifier = trigger.id;
                     var doc = trigger.doc;
 
-                    if (!(triggerIdentifier in utils.triggers)) {
+                    if (!(triggerIdentifier in utils.triggers) && 
!doc.monitor) {
                         //check if trigger still exists in whisk db
                         var triggerObj = utils.parseQName(triggerIdentifier);
                         var host = 'https://' + utils.routerHost + ':' + 443;
@@ -330,19 +333,23 @@ module.exports = function(
                 var triggerIdentifier = change.id;
                 var doc = change.doc;
 
-                logger.info(method, 'got change for trigger', 
triggerIdentifier);
-
                 if (utils.triggers[triggerIdentifier]) {
                     if (doc.status && doc.status.active === false) {
                         utils.deleteTrigger(triggerIdentifier);
+                        if (utils.isMonitoringTrigger(doc.monitor, 
triggerIdentifier)) {
+                            utils.monitorStatus.triggerStopped = "success";
+                        }
                     }
                 }
                 else {
                     //ignore changes to disabled triggers
-                    if (!doc.status || doc.status.active === true) {
+                    if ((!doc.status || doc.status.active === true) && 
(!doc.monitor || doc.monitor === utils.host)) {
                         utils.createTrigger(utils.initTrigger(doc))
                         .then(triggerIdentifier => {
                             logger.info(method, triggerIdentifier, 'created 
successfully');
+                            if (utils.isMonitoringTrigger(doc.monitor, 
triggerIdentifier)) {
+                                utils.monitorStatus.triggerStarted = "success";
+                            }
                         })
                         .catch(err => {
                             var message = 'Automatically disabled after 
receiving exception on create trigger: ' + err;
@@ -368,7 +375,6 @@ module.exports = function(
         var method = 'authorize';
 
         if (utils.endpointAuth) {
-
             if (!req.headers.authorization) {
                 res.set('www-authenticate', 'Basic realm="Private"');
                 res.status(HttpStatus.UNAUTHORIZED);
@@ -388,9 +394,7 @@ module.exports = function(
 
             var uuid = auth[1];
             var key = auth[2];
-
             var endpointAuth = utils.endpointAuth.split(':');
-
             if (endpointAuth[0] === uuid && endpointAuth[1] === key) {
                 next();
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to