jorgebay closed pull request #1012: TINKERPOP-1889 - Upgrade ws, adding ping
and reconnection javascript
URL: https://github.com/apache/tinkerpop/pull/1012
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
b/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
index cc0344974e..76d3cf0aea 100644
---
a/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
+++
b/gremlin-javascript/src/main/javascript/gremlin-javascript/lib/driver/connection.js
@@ -22,6 +22,7 @@
*/
'use strict';
+const EventEmitter = require('events');
const WebSocket = require('ws');
const util = require('util');
const utils = require('../utils');
@@ -37,10 +38,13 @@ const responseStatusCode = {
const defaultMimeType = 'application/vnd.gremlin-v3.0+json';
+const pingIntervalDelay = 60 * 1000;
+const pongTimeoutDelay = 30 * 1000;
+
/**
* Represents a single connection to a Gremlin Server.
*/
-class Connection {
+class Connection extends EventEmitter {
/**
* Creates a new instance of {@link Connection}.
@@ -56,27 +60,17 @@ class Connection {
* @param {GraphSONWriter} [options.writer] The writer to use.
* @param {Authenticator} [options.authenticator] The authentication handler
to use.
* @param {Object} [options.headers] An associative array containing the
additional header key/values for the initial request.
+ * @param {Boolean} [options.pingEnabled] Setup ping interval. Defaults to:
true.
+ * @param {Number} [options.pingInterval] Ping request interval in ms if
ping enabled. Defaults to: 60000.
+ * @param {Number} [options.pongTimeout] Timeout of pong response in ms
after sending a ping. Defaults to: 30000.
+ * @param {Boolean} [options.connectOnStartup] Open websocket on startup.
Defaults to: true.
* @constructor
*/
constructor(url, options) {
- this.url = url;
- options = options || {};
- this._ws = new WebSocket(url, {
- headers: options.headers,
- ca: options.ca,
- cert: options.cert,
- pfx: options.pfx,
- rejectUnauthorized: options.rejectUnauthorized
- });
+ super();
- this._ws.on('open', () => {
- this.isOpen = true;
- if (this._openCallback) {
- this._openCallback();
- }
- });
-
- this._ws.on('message', data => this._handleMessage(data));
+ this.url = url;
+ this.options = options || {};
// A map containing the request id and the handler
this._responseHandlers = {};
@@ -85,6 +79,9 @@ class Connection {
this._openPromise = null;
this._openCallback = null;
this._closePromise = null;
+ this._closeCallback = null;
+ this._pingInterval = null;
+ this._pongTimeout = null;
/**
* Gets the MIME type.
@@ -96,6 +93,14 @@ class Connection {
this.isOpen = false;
this.traversalSource = options.traversalSource || 'g';
this._authenticator = options.authenticator;
+
+ this._pingEnabled = this.options.pingEnabled === false ? false : true;
+ this._pingIntervalDelay = this.options.pingInterval || pingIntervalDelay;
+ this._pongTimeoutDelay = this.options.pongTimeout || pongTimeoutDelay;
+
+ if (this.options.connectOnStartup !== false) {
+ this.open();
+ }
}
/**
@@ -103,18 +108,47 @@ class Connection {
* @returns {Promise}
*/
open() {
- if (this._closePromise) {
- return this._openPromise = Promise.reject(new Error('Connection has been
closed'));
- }
if (this.isOpen) {
return Promise.resolve();
}
if (this._openPromise) {
return this._openPromise;
}
+
+ this.emit('log', `ws open`);
+
+ this._ws = new WebSocket(this.url, {
+ headers: this.options.headers,
+ ca: this.options.ca,
+ cert: this.options.cert,
+ pfx: this.options.pfx,
+ rejectUnauthorized: this.options.rejectUnauthorized
+ });
+
+ this._ws.on('message', (data) => this._handleMessage(data));
+ this._ws.on('error', (err) => this._handleError(err));
+ this._ws.on('close', (code, message) => this._handleClose(code, message));
+
+ this._ws.on('pong', () => {
+ this.emit('log', 'ws pong received');
+ if (this._pongTimeout) {
+ clearTimeout(this._pongTimeout);
+ this._pongTimeout = null;
+ }
+ });
+ this._ws.on('ping', () => {
+ this.emit('log', 'ws ping received');
+ this._ws.pong();
+ });
+
return this._openPromise = new Promise((resolve, reject) => {
- // Set the callback that will be invoked once the WS is opened
- this._openCallback = err => err ? reject(err) : resolve();
+ this._ws.on('open', () => {
+ this.isOpen = true;
+ if (this._pingEnabled) {
+ this._pingHeartbeat();
+ }
+ resolve();
+ });
});
}
@@ -151,6 +185,46 @@ class Connection {
});
}
+ _pingHeartbeat() {
+
+ if (this._pingInterval) {
+ clearInterval(this._pingInterval);
+ this._pingInterval = null;
+ }
+
+ this._pingInterval = setInterval(() => {
+ if (this.isOpen === false) {
+ // in case of if not open..
+ if (this._pingInterval) {
+ clearInterval(this._pingInterval);
+ this._pingInterval = null;
+ }
+ }
+
+ this._pongTimeout = setTimeout(() => {
+ this._ws.terminate();
+ }, this._pongTimeoutDelay);
+
+ this._ws.ping();
+
+ }, this._pingIntervalDelay);
+ }
+
+ _handleError(err) {
+ this.emit('log', `ws error ${err}`);
+ this._cleanupWebsocket();
+ this.emit('error', err);
+ }
+
+ _handleClose(code, message) {
+ this.emit('log', `ws close code=${code} message=${message}`);
+ this._cleanupWebsocket();
+ if (this._closeCallback) {
+ this._closeCallback();
+ }
+ this.emit('close', code, message);
+ }
+
_handleMessage(data) {
const response = this._reader.read(JSON.parse(data.toString()));
if (response.requestId === null || response.requestId === undefined) {
@@ -210,6 +284,25 @@ class Connection {
}
}
+ /**
+ * clean websocket context
+ */
+ _cleanupWebsocket() {
+ if (this._pingInterval) {
+ clearInterval(this._pingInterval);
+ }
+ this._pingInterval = null;
+ if (this._pongTimeout) {
+ clearTimeout(this._pongTimeout);
+ }
+ this._pongTimeout = null;
+
+ this._ws.removeAllListeners();
+ this._openPromise = null;
+ this._closePromise = null;
+ this.isOpen = false;
+ }
+
/**
* Clears the internal state containing the callback and result buffer of a
given request.
* @param requestId
@@ -250,12 +343,12 @@ class Connection {
* @return {Promise}
*/
close() {
+ if (this.isOpen === false) {
+ return Promise.resolve();
+ }
if (!this._closePromise) {
this._closePromise = new Promise(resolve => {
- this._ws.on('close', function () {
- this.isOpen = false;
- resolve();
- });
+ this._closeCallback = resolve;
this._ws.close();
});
}
diff --git
a/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
b/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
index d4a1fb6927..18de8cebc0 100644
--- a/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
+++ b/gremlin-javascript/src/main/javascript/gremlin-javascript/package.json
@@ -14,7 +14,7 @@
],
"license": "Apache-2.0",
"dependencies": {
- "ws": "^3.0.0"
+ "ws": "^6.1.2"
},
"devDependencies": {
"mocha": "~4.0.1",
@@ -40,4 +40,4 @@
"engines": {
"node": ">=6"
}
-}
\ No newline at end of file
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services