[ 
https://issues.apache.org/jira/browse/SSHD-1125?focusedWorklogId=558590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-558590
 ]

ASF GitHub Bot logged work on SSHD-1125:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Feb/21 15:34
            Start Date: 26/Feb/21 15:34
    Worklog Time Spent: 10m 
      Work Description: lgoldstein commented on a change in pull request #181:
URL: https://github.com/apache/mina-sshd/pull/181#discussion_r583723637



##########
File path: 
sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
##########
@@ -52,60 +78,164 @@ public Object getId() {
     @Override
     public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
         if (isClosing()) {
-            throw new EOFException("Closed - state=" + state);
+            throw new EOFException("Closed/ing - state=" + state);
         }
 
+        waitForAvailableWriteSpace(buffer.available());
+
         IoWriteFutureImpl future = new IoWriteFutureImpl(getId(), buffer);
         writes.add(future);
         startWriting();
         return future;
     }
 
+    protected void waitForAvailableWriteSpace(int requiredSize) throws 
IOException {
+        long expireTime = System.currentTimeMillis() + 
maxWaitForPendingWrites.toMillis();
+        synchronized (pendingBytesCount) {
+            for (int count = pendingBytesCount.get();
+                 /*
+                  * The (count > 0) condition is put in place to allow a 
single pending
+                  * write to exceed the maxPendingBytesCount as long as there 
are no
+                  * other pending writes.
+                  */
+                 (count > 0)
+                         // Not already over the limit or about to be over it
+                         && ((count >= maxPendingBytesCount) || ((count + 
requiredSize) > maxPendingBytesCount))
+                         // No pending exception signaled
+                         && (pendingException.get() == null);
+                 count = pendingBytesCount.get()) {
+                long remTime = expireTime - System.currentTimeMillis();
+                if (remTime <= 0L) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Max. pending write timeout expired after 
" + writtenBytesCount + " bytes"));
+                    throw pendingException.get();
+                }
+
+                try {
+                    pendingBytesCount.wait(remTime);
+                } catch (InterruptedException e) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Waiting for pending writes interrupted 
after " + writtenBytesCount + " bytes"));
+                    throw pendingException.get();
+                }
+            }
+
+            IOException e = pendingException.get();
+            if (e != null) {
+                throw e;
+            }
+
+            pendingBytesCount.addAndGet(requiredSize);
+        }
+    }
+
     protected void startWriting() throws IOException {
         IoWriteFutureImpl future = writes.peek();
+        // No more pending requests
         if (future == null) {
             return;
         }
 
+        // Don't try to write any further if pending exception signaled
+        Throwable pendingError = pendingException.get();
+        if (pendingError != null) {
+            log.error("startWriting({})[{}] propagate to {} write requests 
pending error={}[{}]",
+                    getId(), out, writes.size(), getClass().getSimpleName(), 
pendingError.getMessage());
+
+            IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null);
+            for (IoWriteFutureImpl pendingWrite : writes) {
+                // Checking reference by design
+                if (GenericUtils.isSameReference(pendingWrite, currentFuture)) 
{
+                    continue;   // will be taken care of when its listener is 
eventually called
+                }
+
+                future.setValue(pendingError);
+            }
+
+            writes.clear();
+            return;
+        }
+
+        // Cannot honor this request yet since other pending one incomplete
         if (!currentWrite.compareAndSet(null, future)) {
             return;
         }
 
-        out.writeBuffer(future.getBuffer()).addListener(
-                new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(IoWriteFuture f) {
-                        if (f.isWritten()) {
-                            future.setValue(Boolean.TRUE);
-                        } else {
-                            future.setValue(f.getException());
-                        }
-                        finishWrite(future);
-                    }
-                });
+        Buffer buffer = future.getBuffer();
+        int bufferSize = buffer.available();
+        out.writeBuffer(buffer).addListener(new 
SshFutureListener<IoWriteFuture>() {
+            @Override
+            public void operationComplete(IoWriteFuture f) {
+                if (f.isWritten()) {
+                    future.setValue(Boolean.TRUE);
+                } else {
+                    future.setValue(f.getException());
+                }
+                finishWrite(future, bufferSize);
+            }
+        });
     }
 
-    protected void finishWrite(IoWriteFutureImpl future) {
+    protected void finishWrite(IoWriteFutureImpl future, int bufferSize) {
+        /*
+         * Update the pending bytes count only if successfully written,
+         * otherwise signal an error
+         */
+        if (future.isWritten()) {
+            long writtenSize = writtenBytesCount.addAndGet(bufferSize);
+
+            int stillPending;
+            synchronized (pendingBytesCount) {
+                stillPending = pendingBytesCount.addAndGet(0 - bufferSize);
+                pendingBytesCount.notifyAll();
+            }
+
+            if (stillPending < 0) {
+                log.error("finishWrite({})[{}] - pending byte counts underflow 
({}) after {} bytes", getId(), out, stillPending,
+                        writtenSize);
+                pendingException.compareAndSet(null,
+                        new SshChannelBufferedOutputException(channelId, 
"Pending byte counts underflow"));
+            }

Review comment:
       Yes, I intended this - I am fine with a single pending write giving this 
mechanism "the slip". My goal was to avoid an OOM by having an unlimited 
accumulation of pending write requests due to fact that the peer is not 
consuming the sent data. Please note that the pending exception is "sticky" - 
i.e., the _next_ write attempt will fail. This also means that if the write 
request the "got away" was the last one by change and it was consumed by the 
peer there will be no exception thrown - which is also fine by me since as 
mentioned the goal is not to enforce a strict limit on the pending bytes size 
but rather on the accumulation of the pending write requests. I will document 
this in the code to make it clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 558590)
    Time Spent: 1h  (was: 50m)

> Provide a boundary on BufferedIoOutputStream writing to avoid memory overflow
> -----------------------------------------------------------------------------
>
>                 Key: SSHD-1125
>                 URL: https://issues.apache.org/jira/browse/SSHD-1125
>             Project: MINA SSHD
>          Issue Type: Bug
>            Reporter: Lyor Goldstein
>            Assignee: Lyor Goldstein
>            Priority: Major
>              Labels: memory
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Use an upper bound to the data pending in the {{BufferedIoOutputStream}}. The 
> max data could be set to the max window size.  Blocking until there is enough 
> room should allow the client to read the data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to