This is an automated email from the ASF dual-hosted git repository.

glynnbird pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb-nano.git


The following commit(s) were added to refs/heads/main by this push:
     new d6c6c3a  Abort HTTP connection when stopping changesReader (#310)
d6c6c3a is described below

commit d6c6c3ad0c130b515d641248e9b0f7dca0944fbd
Author: insidewhy <[email protected]>
AuthorDate: Thu Nov 3 19:11:00 2022 +0800

    Abort HTTP connection when stopping changesReader (#310)
---
 README.md                           |  2 +-
 lib/changesreader.js                |  8 ++++++++
 lib/nano.js                         | 14 ++++++++++++++
 package-lock.json                   | 11 +++++++++++
 package.json                        |  3 ++-
 test/document.changesreader.test.js | 26 ++++++++++++++++++++++++++
 6 files changed, 62 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index e019a5e..c0ef5f2 100644
--- a/README.md
+++ b/README.md
@@ -650,7 +650,7 @@ reliable, resumable changes feed follower, then you need 
the `changesReader`.
 
 There are three ways to start listening to the changes feed:
 
-1. `changesReader.start()` - to listen to changes indefinitely by repeated 
"long poll" requests. This mode continues to poll for changes forever.
+1. `changesReader.start()` - to listen to changes indefinitely by repeated 
"long poll" requests. This mode continues to poll for changes until 
`changesReader.stop()` is called, at which point any active long poll will be 
canceled.
 2. `changesReader.get()` - to listen to changes until the end of the changes 
feed is reached, by repeated "long poll" requests. Once a response with zero 
changes is received, the 'end' event will indicate the end of the changes and 
polling will stop.
 3. `changesReader.spool()` - listen to changes in one long HTTP request. (as 
opposed to repeated round trips) - spool is faster but less reliable.
 
diff --git a/lib/changesreader.js b/lib/changesreader.js
index cf2a7c6..a79c656 100644
--- a/lib/changesreader.js
+++ b/lib/changesreader.js
@@ -1,4 +1,5 @@
 const EventEmitter = require('events').EventEmitter
+const AbortController = global.AbortController || 
require('node-abort-controller').AbortController
 const stream = require('stream')
 const EVENT_BATCH = 'batch'
 const EVENT_CHANGE = 'change'
@@ -103,6 +104,7 @@ class ChangesReader {
     this.qs = {} // extra querystring parameters
     this.selector = null
     this.paused = false
+    this.abortController = null
   }
 
   pause () {
@@ -116,6 +118,9 @@ class ChangesReader {
   // prevent another poll happening
   stop () {
     this.continue = false
+    if (this.abortController) {
+      this.abortController.abort()
+    }
   }
 
   // sleep, promise style
@@ -148,11 +153,13 @@ class ChangesReader {
     const work = async () => {
       do {
         if (!self.paused) {
+          self.abortController = new AbortController()
           // formulate changes feed longpoll HTTP request
           const req = {
             method: 'post',
             db: self.db,
             path: '_changes',
+            signal: self.abortController.signal,
             qs: {
               feed: 'longpoll',
               timeout: self.timeout,
@@ -174,6 +181,7 @@ class ChangesReader {
           // make HTTP request to get up to batchSize changes from the feed
           try {
             const data = await self.request(req)
+            self.abortController = null
             delay = 0
 
             // update the since state
diff --git a/lib/nano.js b/lib/nano.js
index be0199f..703946c 100644
--- a/lib/nano.js
+++ b/lib/nano.js
@@ -129,6 +129,16 @@ module.exports = exports = function dbScope (cfg) {
       statusCode: statusCode
     }, response.headers)
     if (!response.status) {
+      if (axios.isCancel(response)) {
+        if (resolve) {
+          resolve('canceled')
+        }
+        if (callback) {
+          callback(null, 'canceled', responseHeaders)
+        }
+        return
+      }
+
       log({ err: 'socket', body: body, headers: responseHeaders })
       if (reject) {
         reject(new Error(`error happened in your connection. Reason: 
${response.message}`))
@@ -267,6 +277,10 @@ module.exports = exports = function dbScope (cfg) {
     // https://github.com/mikeal/request#requestjar
     const isJar = opts.jar || cfg.jar || (cfg.requestDefaults && 
cfg.requestDefaults.jar)
 
+    if (opts.signal) {
+      req.signal = opts.signal
+    }
+
     if (isJar) {
       req.jar = cookieJar
       req.withCredentials = true
diff --git a/package-lock.json b/package-lock.json
index 69437bd..68b3ad6 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -12,6 +12,7 @@
         "@types/tough-cookie": "^4.0.0",
         "axios": "^0.26.1",
         "http-cookie-agent": "^1.0.5",
+        "node-abort-controller": "^3.0.1",
         "qs": "^6.10.3",
         "tough-cookie": "^4.0.0"
       },
@@ -4487,6 +4488,11 @@
         "node": ">= 10.13"
       }
     },
+    "node_modules/node-abort-controller": {
+      "version": "3.0.1",
+      "resolved": 
"https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.0.1.tgz";,
+      "integrity": 
"sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw=="
+    },
     "node_modules/node-int64": {
       "version": "0.4.0",
       "resolved": 
"https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz";,
@@ -9668,6 +9674,11 @@
         "propagate": "^2.0.0"
       }
     },
+    "node-abort-controller": {
+      "version": "3.0.1",
+      "resolved": 
"https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.0.1.tgz";,
+      "integrity": 
"sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw=="
+    },
     "node-int64": {
       "version": "0.4.0",
       "resolved": 
"https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz";,
diff --git a/package.json b/package.json
index 7e549a8..6edfae1 100644
--- a/package.json
+++ b/package.json
@@ -20,6 +20,7 @@
     "http-cookie-agent": "^1.0.5",
     "@types/tough-cookie": "^4.0.0",
     "axios": "^0.26.1",
+    "node-abort-controller": "^3.0.1",
     "qs": "^6.10.3",
     "tough-cookie": "^4.0.0"
   },
@@ -48,4 +49,4 @@
       "jest"
     ]
   }
-}
\ No newline at end of file
+}
diff --git a/test/document.changesreader.test.js 
b/test/document.changesreader.test.js
index 292b51a..028afb6 100644
--- a/test/document.changesreader.test.js
+++ b/test/document.changesreader.test.js
@@ -505,3 +505,29 @@ test('should survive malformed JSON - 
db.changesReader.start', async () => {
     })
   })
 }, 10000)
+
+test('should cancel HTTP connection as soon as stop is called', async () => {
+  const changeURL = `/${DBNAME}/_changes`
+  nock(COUCH_URL)
+    .post(changeURL)
+    .query({ feed: 'longpoll', timeout: 60000, since: 'now', limit: 100, 
include_docs: false })
+    .reply(200, { results: [], last_seq: '1-0', pending: 0 })
+    .post(changeURL)
+    .query({ feed: 'longpoll', timeout: 60000, since: '1-0', limit: 100, 
include_docs: false })
+    .delay(60000)
+    .reply(500)
+  const db = nano.db.use(DBNAME)
+  const cr = db.changesReader.start()
+  await new Promise((resolve, reject) => {
+    cr.on('seq', function (seq) {
+      setTimeout(function () {
+        // give the next http connection a chance to be established
+        db.changesReader.stop()
+      }, 200)
+    })
+
+    cr.on('end', function () {
+      resolve()
+    })
+  })
+})

Reply via email to