This is an automated email from the ASF dual-hosted git repository.
glynnbird pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb-nano.git
The following commit(s) were added to refs/heads/main by this push:
new d6c6c3a Abort HTTP connection when stopping changesReader (#310)
d6c6c3a is described below
commit d6c6c3ad0c130b515d641248e9b0f7dca0944fbd
Author: insidewhy <[email protected]>
AuthorDate: Thu Nov 3 19:11:00 2022 +0800
Abort HTTP connection when stopping changesReader (#310)
---
README.md | 2 +-
lib/changesreader.js | 8 ++++++++
lib/nano.js | 14 ++++++++++++++
package-lock.json | 11 +++++++++++
package.json | 3 ++-
test/document.changesreader.test.js | 26 ++++++++++++++++++++++++++
6 files changed, 62 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index e019a5e..c0ef5f2 100644
--- a/README.md
+++ b/README.md
@@ -650,7 +650,7 @@ reliable, resumable changes feed follower, then you need
the `changesReader`.
There are three ways to start listening to the changes feed:
-1. `changesReader.start()` - to listen to changes indefinitely by repeated
"long poll" requests. This mode continues to poll for changes forever.
+1. `changesReader.start()` - to listen to changes indefinitely by repeated
"long poll" requests. This mode continues to poll for changes until
`changesReader.stop()` is called, at which point any active long poll will be
canceled.
2. `changesReader.get()` - to listen to changes until the end of the changes
feed is reached, by repeated "long poll" requests. Once a response with zero
changes is received, the 'end' event will indicate the end of the changes and
polling will stop.
3. `changesReader.spool()` - listen to changes in one long HTTP request. (as
opposed to repeated round trips) - spool is faster but less reliable.
diff --git a/lib/changesreader.js b/lib/changesreader.js
index cf2a7c6..a79c656 100644
--- a/lib/changesreader.js
+++ b/lib/changesreader.js
@@ -1,4 +1,5 @@
const EventEmitter = require('events').EventEmitter
+const AbortController = global.AbortController ||
require('node-abort-controller').AbortController
const stream = require('stream')
const EVENT_BATCH = 'batch'
const EVENT_CHANGE = 'change'
@@ -103,6 +104,7 @@ class ChangesReader {
this.qs = {} // extra querystring parameters
this.selector = null
this.paused = false
+ this.abortController = null
}
pause () {
@@ -116,6 +118,9 @@ class ChangesReader {
// prevent another poll happening
stop () {
this.continue = false
+ if (this.abortController) {
+ this.abortController.abort()
+ }
}
// sleep, promise style
@@ -148,11 +153,13 @@ class ChangesReader {
const work = async () => {
do {
if (!self.paused) {
+ self.abortController = new AbortController()
// formulate changes feed longpoll HTTP request
const req = {
method: 'post',
db: self.db,
path: '_changes',
+ signal: self.abortController.signal,
qs: {
feed: 'longpoll',
timeout: self.timeout,
@@ -174,6 +181,7 @@ class ChangesReader {
// make HTTP request to get up to batchSize changes from the feed
try {
const data = await self.request(req)
+ self.abortController = null
delay = 0
// update the since state
diff --git a/lib/nano.js b/lib/nano.js
index be0199f..703946c 100644
--- a/lib/nano.js
+++ b/lib/nano.js
@@ -129,6 +129,16 @@ module.exports = exports = function dbScope (cfg) {
statusCode: statusCode
}, response.headers)
if (!response.status) {
+ if (axios.isCancel(response)) {
+ if (resolve) {
+ resolve('canceled')
+ }
+ if (callback) {
+ callback(null, 'canceled', responseHeaders)
+ }
+ return
+ }
+
log({ err: 'socket', body: body, headers: responseHeaders })
if (reject) {
reject(new Error(`error happened in your connection. Reason:
${response.message}`))
@@ -267,6 +277,10 @@ module.exports = exports = function dbScope (cfg) {
// https://github.com/mikeal/request#requestjar
const isJar = opts.jar || cfg.jar || (cfg.requestDefaults &&
cfg.requestDefaults.jar)
+ if (opts.signal) {
+ req.signal = opts.signal
+ }
+
if (isJar) {
req.jar = cookieJar
req.withCredentials = true
diff --git a/package-lock.json b/package-lock.json
index 69437bd..68b3ad6 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -12,6 +12,7 @@
"@types/tough-cookie": "^4.0.0",
"axios": "^0.26.1",
"http-cookie-agent": "^1.0.5",
+ "node-abort-controller": "^3.0.1",
"qs": "^6.10.3",
"tough-cookie": "^4.0.0"
},
@@ -4487,6 +4488,11 @@
"node": ">= 10.13"
}
},
+ "node_modules/node-abort-controller": {
+ "version": "3.0.1",
+ "resolved":
"https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.0.1.tgz",
+ "integrity":
"sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw=="
+ },
"node_modules/node-int64": {
"version": "0.4.0",
"resolved":
"https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz",
@@ -9668,6 +9674,11 @@
"propagate": "^2.0.0"
}
},
+ "node-abort-controller": {
+ "version": "3.0.1",
+ "resolved":
"https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.0.1.tgz",
+ "integrity":
"sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw=="
+ },
"node-int64": {
"version": "0.4.0",
"resolved":
"https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz",
diff --git a/package.json b/package.json
index 7e549a8..6edfae1 100644
--- a/package.json
+++ b/package.json
@@ -20,6 +20,7 @@
"http-cookie-agent": "^1.0.5",
"@types/tough-cookie": "^4.0.0",
"axios": "^0.26.1",
+ "node-abort-controller": "^3.0.1",
"qs": "^6.10.3",
"tough-cookie": "^4.0.0"
},
@@ -48,4 +49,4 @@
"jest"
]
}
-}
\ No newline at end of file
+}
diff --git a/test/document.changesreader.test.js
b/test/document.changesreader.test.js
index 292b51a..028afb6 100644
--- a/test/document.changesreader.test.js
+++ b/test/document.changesreader.test.js
@@ -505,3 +505,29 @@ test('should survive malformed JSON -
db.changesReader.start', async () => {
})
})
}, 10000)
+
+test('should cancel HTTP connection as soon as stop is called', async () => {
+ const changeURL = `/${DBNAME}/_changes`
+ nock(COUCH_URL)
+ .post(changeURL)
+ .query({ feed: 'longpoll', timeout: 60000, since: 'now', limit: 100,
include_docs: false })
+ .reply(200, { results: [], last_seq: '1-0', pending: 0 })
+ .post(changeURL)
+ .query({ feed: 'longpoll', timeout: 60000, since: '1-0', limit: 100,
include_docs: false })
+ .delay(60000)
+ .reply(500)
+ const db = nano.db.use(DBNAME)
+ const cr = db.changesReader.start()
+ await new Promise((resolve, reject) => {
+ cr.on('seq', function (seq) {
+ setTimeout(function () {
+ // give the next http connection a chance to be established
+ db.changesReader.stop()
+ }, 200)
+ })
+
+ cr.on('end', function () {
+ resolve()
+ })
+ })
+})