bjustin-ibm commented on a change in pull request #160: HA - redundancy
URL: 
https://github.com/apache/incubator-openwhisk-package-kafka/pull/160#discussion_r116280624
 
 

 ##########
 File path: action/messageHubFeedWeb.js
 ##########
 @@ -0,0 +1,192 @@
+var common = require('./lib/common');
+var Database = require('./lib/Database');
+
+/**
+ *   Feed to listen to MessageHub messages
+ *  @param {string} brokers - array of Message Hub brokers
+ *  @param {string} username - Kafka username
+ *  @param {string} password - Kafka password
+ *  @param {string} topic - topic to subscribe to
+ *  @param {bool}   isJSONData - attempt to parse messages as JSON
+ *  @param {bool}   isBinaryKey - encode key as Base64
+ *  @param {bool}   isBinaryValue - encode message as Base64
+ *  @param {string} endpoint - address to OpenWhisk deployment (expected to be 
bound at deployment)
+ *  @param {string} DB_URL - URL for the DB, must include authentication 
(expected to be bound at deployment)
+ *  @param {string} DB_NAME - DB name (expected to be bound at deployment)
+ */
+function main(params) {
+    var promise = new Promise(function(resolve, reject) {
+        console.log(`${JSON.stringify(params, null, 2)}`);
+
+        // hold off initializing this until definitely needed
+        var db;
+
+        if (params.__ow_method === "put") {
+            return validateParameters(params)
+                .then(validatedParams => {
+                    console.log(`VALIDATED: ${JSON.stringify(validatedParams, 
null, 2)}`);
+                    db = new Database(params.DB_URL, params.DB_NAME);
+
+                    var promises = [];
+
+                    // do these in parallel!
+                    
promises.push(db.ensureTriggerIsUnique(validatedParams.triggerName));
+                    promises.push(checkMessageHubCredentials(validatedParams));
+
+                    // TODO is this necessary?
+                    
promises.push(common.verifyTriggerAuth(validatedParams.triggerURL));
+
+                    return Promise.all(promises)
+                        .then(result => validatedParams);
+                })
+                .then(validatedParams => db.recordTrigger(validatedParams))
+                .then(result => {
+                    console.log('successfully wrote the trigger');
+                    resolve();
+                })
+                .catch(error => {
+                    console.log(`Failed to write the trigger ${error}`);
+                    resolve({
+                        statusCode: 500,
+                        headers: {'Content-Type': 'text/plain'},
+                        body: error
+                    });
+                });
+        } else if (params.__ow_method === "delete") {
+            db = new Database(params.DB_URL, params.DB_NAME);
+
+            return common.resolveTriggerDetails(params.authKey, 
common.massageEndpointParam(params.endpoint), params.triggerName)
+                .then(triggerDetails => {
+                    const triggerFQN = triggerDetails.triggerName;
+                    return db.getTrigger(triggerFQN)
+                })
+                .then(doc => {
+                    return db.deleteTrigger(doc);
+                })
+                .then(resolve)
+                .catch(reject);
+        } else {
+            reject({ error: 'unsupported lifecycleEvent' });
+        }
+    });
+
+    return promise;
+}
+
+function checkMessageHubCredentials(params) {
+    // listing topics seems to be the simplest way to check auth
+    var topicURL = params.kafka_admin_url + '/admin/topics';
+
+    var options = {
+        method: 'GET',
+        url: topicURL,
+        json: true,
+        headers: {
+            'X-Auth-Token': params.username + params.password
+        }
+    };
+
+    const request = require('request-promise');
+
+    return request(options)
+        .then(function (body) {
+            console.log("Successfully authenticated with Message Hub");
+
+            var topicNames = body.map(function (topic) {
+                return topic.name
+            });
+
+            if (topicNames.indexOf(params.topic) < 0) {
+                return Promise.reject( 'Topic does not exist. You must create 
the topic first: ' + params.topic );
+            }
+        }, function (authError) {
+            console.log(`authError: ${JSON.stringify(authError)}`);
+            return Promise.reject( 'Could not authenticate with Message Hub. 
Please check your credentials.' );
+        });
+}
+
+function validateParameters(rawParams) {
+    var promise = new Promise(function (resolve, reject) {
+        var validatedParams = {};
+
+        // topic
+        if (rawParams.topic && rawParams.topic.length > 0) {
+            validatedParams.topic = rawParams.topic;
+        } else {
+            reject( "You must supply a 'topic' parameter." );
+            return;
+        }
+
+        // kafka_brokers_sasl
+        if (rawParams.kafka_brokers_sasl) {
+            validatedParams.brokers = 
common.validateBrokerParam(rawParams.kafka_brokers_sasl);
+            if (!validatedParams.brokers) {
+                reject( "You must supply a 'kafka_brokers_sasl' parameter as 
an array of Message Hub brokers." );
+                return;
+            }
+        } else {
+            reject( "You must supply a 'kafka_brokers_sasl' parameter as an 
array of Message Hub brokers." );
+            return;
+        }
+
+        // user
+        if (rawParams.user) {
+            validatedParams.username = rawParams.user;
+        } else {
+            reject( "You must supply a 'user' parameter to authenticate with 
Message Hub." );
+            return;
+        }
+
+        // password
+        if (rawParams.password) {
+            validatedParams.password = rawParams.password;
+        } else {
+            reject( "You must supply a 'password' parameter to authenticate 
with Message Hub." );
+            return;
+        }
+
+        // kafka_admin_url
+        if (rawParams.kafka_admin_url) {
+            validatedParams.kafka_admin_url = rawParams.kafka_admin_url;
+        } else {
+            reject( "You must supply a 'kafka_admin_url' parameter." );
+            return;
+        }
+
+        validatedParams.isJSONData = common.getBooleanFromArgs(rawParams, 
'isJSONData');
+        validatedParams.isBinaryValue = common.getBooleanFromArgs(rawParams, 
'isBinaryValue');
+
+        if (validatedParams.isJSONData && validatedParams.isBinaryValue) {
+            reject( 'isJSONData and isBinaryValue cannot both be enabled.' );
+            return;
+        }
+
+        // now that everything else is valid, let's add these
+        validatedParams.isMessageHub = true;
+        validatedParams.isBinaryKey = common.getBooleanFromArgs(rawParams, 
'isBinaryKey');
+        validatedParams.endpoint = 
common.massageEndpointParam(rawParams.endpoint);
+        validatedParams.authKey = rawParams.authKey;
+
+        const uuid = require('uuid');
+        validatedParams.uuid = uuid.v4();
+
+        console.log(`I made it this far! ${JSON.stringify(validatedParams, 
null, 2)}`);
+
+        common.resolveTriggerDetails(validatedParams.authKey, 
validatedParams.endpoint, rawParams.triggerName)
 
 Review comment:
   The whole point of this is to make sure we have the correct namespace. In 
this action, the namespace env var will be the action owner's (e.g. 
whisk.system), not the namespace for the trigger being created. However, 
@jasonpet points out that the correct namespace _is_ available in the non-web 
feed action, and could be simply passed into the web action as a parameter 
value. Brilliant!
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to