This is an automated email from the ASF dual-hosted git repository.
jorgebg pushed a commit to branch tp33
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp33 by this push:
new d58712c TINKERPOP-1889 - Upgrade ws, adding ping and reconnection
javascript
d58712c is described below
commit d58712c504f41a7953cb004a51f301c6fe99726f
Author: Tieu Philippe KHIM <[email protected]>
AuthorDate: Mon Dec 24 11:53:01 2018 +0100
TINKERPOP-1889 - Upgrade ws, adding ping and reconnection javascript
---
.../gremlin-javascript/lib/driver/connection.js | 147 +++++++++++++++++----
.../javascript/gremlin-javascript/package.json | 4 +-
2 files changed, 122 insertions(+), 29 deletions(-)
diff --git
a/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
b/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
index cc03449..76d3cf0 100644
---
a/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
+++
b/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
@@ -22,6 +22,7 @@
*/
'use strict';
+const EventEmitter = require('events');
const WebSocket = require('ws');
const util = require('util');
const utils = require('../utils');
@@ -37,10 +38,13 @@ const responseStatusCode = {
const defaultMimeType = 'application/vnd.gremlin-v3.0+json';
+const pingIntervalDelay = 60 * 1000;
+const pongTimeoutDelay = 30 * 1000;
+
/**
* Represents a single connection to a Gremlin Server.
*/
-class Connection {
+class Connection extends EventEmitter {
/**
* Creates a new instance of {@link Connection}.
@@ -56,27 +60,17 @@ class Connection {
* @param {GraphSONWriter} [options.writer] The writer to use.
* @param {Authenticator} [options.authenticator] The authentication handler
to use.
* @param {Object} [options.headers] An associative array containing the
additional header key/values for the initial request.
+ * @param {Boolean} [options.pingEnabled] Setup ping interval. Defaults to:
true.
+ * @param {Number} [options.pingInterval] Ping request interval in ms if
ping enabled. Defaults to: 60000.
+ * @param {Number} [options.pongTimeout] Timeout of pong response in ms
after sending a ping. Defaults to: 30000.
+ * @param {Boolean} [options.connectOnStartup] Open websocket on startup.
Defaults to: true.
* @constructor
*/
constructor(url, options) {
- this.url = url;
- options = options || {};
- this._ws = new WebSocket(url, {
- headers: options.headers,
- ca: options.ca,
- cert: options.cert,
- pfx: options.pfx,
- rejectUnauthorized: options.rejectUnauthorized
- });
+ super();
- this._ws.on('open', () => {
- this.isOpen = true;
- if (this._openCallback) {
- this._openCallback();
- }
- });
-
- this._ws.on('message', data => this._handleMessage(data));
+ this.url = url;
+ this.options = options || {};
// A map containing the request id and the handler
this._responseHandlers = {};
@@ -85,6 +79,9 @@ class Connection {
this._openPromise = null;
this._openCallback = null;
this._closePromise = null;
+ this._closeCallback = null;
+ this._pingInterval = null;
+ this._pongTimeout = null;
/**
* Gets the MIME type.
@@ -96,6 +93,14 @@ class Connection {
this.isOpen = false;
this.traversalSource = options.traversalSource || 'g';
this._authenticator = options.authenticator;
+
+ this._pingEnabled = this.options.pingEnabled === false ? false : true;
+ this._pingIntervalDelay = this.options.pingInterval || pingIntervalDelay;
+ this._pongTimeoutDelay = this.options.pongTimeout || pongTimeoutDelay;
+
+ if (this.options.connectOnStartup !== false) {
+ this.open();
+ }
}
/**
@@ -103,18 +108,47 @@ class Connection {
* @returns {Promise}
*/
open() {
- if (this._closePromise) {
- return this._openPromise = Promise.reject(new Error('Connection has been
closed'));
- }
if (this.isOpen) {
return Promise.resolve();
}
if (this._openPromise) {
return this._openPromise;
}
+
+ this.emit('log', `ws open`);
+
+ this._ws = new WebSocket(this.url, {
+ headers: this.options.headers,
+ ca: this.options.ca,
+ cert: this.options.cert,
+ pfx: this.options.pfx,
+ rejectUnauthorized: this.options.rejectUnauthorized
+ });
+
+ this._ws.on('message', (data) => this._handleMessage(data));
+ this._ws.on('error', (err) => this._handleError(err));
+ this._ws.on('close', (code, message) => this._handleClose(code, message));
+
+ this._ws.on('pong', () => {
+ this.emit('log', 'ws pong received');
+ if (this._pongTimeout) {
+ clearTimeout(this._pongTimeout);
+ this._pongTimeout = null;
+ }
+ });
+ this._ws.on('ping', () => {
+ this.emit('log', 'ws ping received');
+ this._ws.pong();
+ });
+
return this._openPromise = new Promise((resolve, reject) => {
- // Set the callback that will be invoked once the WS is opened
- this._openCallback = err => err ? reject(err) : resolve();
+ this._ws.on('open', () => {
+ this.isOpen = true;
+ if (this._pingEnabled) {
+ this._pingHeartbeat();
+ }
+ resolve();
+ });
});
}
@@ -151,6 +185,46 @@ class Connection {
});
}
+ _pingHeartbeat() {
+
+ if (this._pingInterval) {
+ clearInterval(this._pingInterval);
+ this._pingInterval = null;
+ }
+
+ this._pingInterval = setInterval(() => {
+ if (this.isOpen === false) {
+ // in case of if not open..
+ if (this._pingInterval) {
+ clearInterval(this._pingInterval);
+ this._pingInterval = null;
+ }
+ }
+
+ this._pongTimeout = setTimeout(() => {
+ this._ws.terminate();
+ }, this._pongTimeoutDelay);
+
+ this._ws.ping();
+
+ }, this._pingIntervalDelay);
+ }
+
+ _handleError(err) {
+ this.emit('log', `ws error ${err}`);
+ this._cleanupWebsocket();
+ this.emit('error', err);
+ }
+
+ _handleClose(code, message) {
+ this.emit('log', `ws close code=${code} message=${message}`);
+ this._cleanupWebsocket();
+ if (this._closeCallback) {
+ this._closeCallback();
+ }
+ this.emit('close', code, message);
+ }
+
_handleMessage(data) {
const response = this._reader.read(JSON.parse(data.toString()));
if (response.requestId === null || response.requestId === undefined) {
@@ -211,6 +285,25 @@ class Connection {
}
/**
+ * clean websocket context
+ */
+ _cleanupWebsocket() {
+ if (this._pingInterval) {
+ clearInterval(this._pingInterval);
+ }
+ this._pingInterval = null;
+ if (this._pongTimeout) {
+ clearTimeout(this._pongTimeout);
+ }
+ this._pongTimeout = null;
+
+ this._ws.removeAllListeners();
+ this._openPromise = null;
+ this._closePromise = null;
+ this.isOpen = false;
+ }
+
+ /**
* Clears the internal state containing the callback and result buffer of a
given request.
* @param requestId
* @private
@@ -250,12 +343,12 @@ class Connection {
* @return {Promise}
*/
close() {
+ if (this.isOpen === false) {
+ return Promise.resolve();
+ }
if (!this._closePromise) {
this._closePromise = new Promise(resolve => {
- this._ws.on('close', function () {
- this.isOpen = false;
- resolve();
- });
+ this._closeCallback = resolve;
this._ws.close();
});
}
diff --git
a/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
b/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
index d4a1fb6..18de8ce 100644
--- a/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
+++ b/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
@@ -14,7 +14,7 @@
],
"license": "Apache-2.0",
"dependencies": {
- "ws": "^3.0.0"
+ "ws": "^6.1.2"
},
"devDependencies": {
"mocha": "~4.0.1",
@@ -40,4 +40,4 @@
"engines": {
"node": ">=6"
}
-}
\ No newline at end of file
+}