This is an automated email from the ASF dual-hosted git repository.

jorgebg pushed a commit to branch tp33
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp33 by this push:
     new d58712c  TINKERPOP-1889 - Upgrade ws, adding ping and reconnection 
javascript
d58712c is described below

commit d58712c504f41a7953cb004a51f301c6fe99726f
Author: Tieu Philippe KHIM <[email protected]>
AuthorDate: Mon Dec 24 11:53:01 2018 +0100

    TINKERPOP-1889 - Upgrade ws, adding ping and reconnection javascript
---
 .../gremlin-javascript/lib/driver/connection.js    | 147 +++++++++++++++++----
 .../javascript/gremlin-javascript/package.json     |   4 +-
 2 files changed, 122 insertions(+), 29 deletions(-)

diff --git 
a/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
 
b/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
index cc03449..76d3cf0 100644
--- 
a/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
+++ 
b/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
@@ -22,6 +22,7 @@
  */
 'use strict';
 
+const EventEmitter = require('events');
 const WebSocket = require('ws');
 const util = require('util');
 const utils = require('../utils');
@@ -37,10 +38,13 @@ const responseStatusCode = {
 
 const defaultMimeType = 'application/vnd.gremlin-v3.0+json';
 
+const pingIntervalDelay = 60 * 1000;
+const pongTimeoutDelay = 30 * 1000;
+
 /**
  * Represents a single connection to a Gremlin Server.
  */
-class Connection {
+class Connection extends EventEmitter {
 
   /**
    * Creates a new instance of {@link Connection}.
@@ -56,27 +60,17 @@ class Connection {
    * @param {GraphSONWriter} [options.writer] The writer to use.
    * @param {Authenticator} [options.authenticator] The authentication handler 
to use.
    * @param {Object} [options.headers] An associative array containing the 
additional header key/values for the initial request.
+   * @param {Boolean} [options.pingEnabled] Setup ping interval. Defaults to: 
true.
+   * @param {Number} [options.pingInterval] Ping request interval in ms if 
ping enabled. Defaults to: 60000.
+   * @param {Number} [options.pongTimeout] Timeout of pong response in ms 
after sending a ping. Defaults to: 30000.
+   * @param {Boolean} [options.connectOnStartup] Open websocket on startup. 
Defaults to: true.
    * @constructor
    */
   constructor(url, options) {
-    this.url = url;
-    options = options || {};
-    this._ws = new WebSocket(url, {
-      headers: options.headers,
-      ca: options.ca,
-      cert: options.cert,
-      pfx: options.pfx,
-      rejectUnauthorized: options.rejectUnauthorized
-    });
+    super();
 
-    this._ws.on('open', () => {
-      this.isOpen = true;
-      if (this._openCallback) {
-        this._openCallback();
-      }
-    });
-
-    this._ws.on('message', data => this._handleMessage(data));
+    this.url = url;
+    this.options = options || {};
 
     // A map containing the request id and the handler
     this._responseHandlers = {};
@@ -85,6 +79,9 @@ class Connection {
     this._openPromise = null;
     this._openCallback = null;
     this._closePromise = null;
+    this._closeCallback = null;
+    this._pingInterval = null;
+    this._pongTimeout = null;
 
     /**
      * Gets the MIME type.
@@ -96,6 +93,14 @@ class Connection {
     this.isOpen = false;
     this.traversalSource = options.traversalSource || 'g';
     this._authenticator = options.authenticator;
+
+    this._pingEnabled = this.options.pingEnabled === false ? false : true;
+    this._pingIntervalDelay = this.options.pingInterval || pingIntervalDelay;
+    this._pongTimeoutDelay = this.options.pongTimeout || pongTimeoutDelay;
+
+    if (this.options.connectOnStartup !== false) {
+      this.open();
+    }
   }
 
   /**
@@ -103,18 +108,47 @@ class Connection {
    * @returns {Promise}
    */
   open() {
-    if (this._closePromise) {
-      return this._openPromise = Promise.reject(new Error('Connection has been 
closed'));
-    }
     if (this.isOpen) {
       return Promise.resolve();
     }
     if (this._openPromise) {
       return this._openPromise;
     }
+
+    this.emit('log', `ws open`);
+
+    this._ws = new WebSocket(this.url, {
+      headers: this.options.headers,
+      ca: this.options.ca,
+      cert: this.options.cert,
+      pfx: this.options.pfx,
+      rejectUnauthorized: this.options.rejectUnauthorized
+    });
+
+    this._ws.on('message', (data) => this._handleMessage(data));
+    this._ws.on('error', (err) => this._handleError(err));
+    this._ws.on('close', (code, message) => this._handleClose(code, message));
+
+    this._ws.on('pong', () => {
+      this.emit('log', 'ws pong received');
+      if (this._pongTimeout) {
+        clearTimeout(this._pongTimeout);
+        this._pongTimeout = null;
+      }
+    });
+    this._ws.on('ping', () => {
+      this.emit('log', 'ws ping received');
+      this._ws.pong();
+    });
+
     return this._openPromise = new Promise((resolve, reject) => {
-      // Set the callback that will be invoked once the WS is opened
-      this._openCallback = err => err ? reject(err) : resolve();
+      this._ws.on('open', () => {
+        this.isOpen = true;
+        if (this._pingEnabled) {
+          this._pingHeartbeat();
+        }
+        resolve();
+      });
     });
   }
 
@@ -151,6 +185,46 @@ class Connection {
     });
   }
 
+  _pingHeartbeat() {
+
+    if (this._pingInterval) {
+      clearInterval(this._pingInterval);
+      this._pingInterval = null;
+    }
+
+    this._pingInterval = setInterval(() => {
+      if (this.isOpen === false) {
+        // in case of if not open..
+        if (this._pingInterval) {
+          clearInterval(this._pingInterval);
+          this._pingInterval = null;
+        }
+      }
+
+      this._pongTimeout = setTimeout(() => {
+        this._ws.terminate();
+      }, this._pongTimeoutDelay);
+
+      this._ws.ping();
+
+    }, this._pingIntervalDelay);
+  }
+
+  _handleError(err) {
+    this.emit('log', `ws error ${err}`);
+    this._cleanupWebsocket();
+    this.emit('error', err);
+  }
+
+  _handleClose(code, message) {
+    this.emit('log', `ws close code=${code} message=${message}`);
+    this._cleanupWebsocket();
+    if (this._closeCallback) {
+      this._closeCallback();
+    }
+    this.emit('close', code, message);
+  }
+
   _handleMessage(data) {
     const response = this._reader.read(JSON.parse(data.toString()));
     if (response.requestId === null || response.requestId === undefined) {
@@ -211,6 +285,25 @@ class Connection {
   }
 
   /**
+   * clean websocket context
+   */
+  _cleanupWebsocket() {
+    if (this._pingInterval) {
+      clearInterval(this._pingInterval);
+    }
+    this._pingInterval = null;
+    if (this._pongTimeout) {
+      clearTimeout(this._pongTimeout);
+    }
+    this._pongTimeout = null;
+
+    this._ws.removeAllListeners();
+    this._openPromise = null;
+    this._closePromise = null;
+    this.isOpen = false;
+  }
+
+  /**
    * Clears the internal state containing the callback and result buffer of a 
given request.
    * @param requestId
    * @private
@@ -250,12 +343,12 @@ class Connection {
    * @return {Promise}
    */
   close() {
+    if (this.isOpen === false) {
+      return Promise.resolve();
+    }
     if (!this._closePromise) {
       this._closePromise = new Promise(resolve => {
-        this._ws.on('close', function () {
-          this.isOpen = false;
-          resolve();
-        });
+        this._closeCallback = resolve;
         this._ws.close();
       });
     }
diff --git 
a/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json 
b/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
index d4a1fb6..18de8ce 100644
--- a/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
+++ b/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
@@ -14,7 +14,7 @@
   ],
   "license": "Apache-2.0",
   "dependencies": {
-    "ws": "^3.0.0"
+    "ws": "^6.1.2"
   },
   "devDependencies": {
     "mocha": "~4.0.1",
@@ -40,4 +40,4 @@
   "engines": {
     "node": ">=6"
   }
-}
\ No newline at end of file
+}

Reply via email to