Alon Bar-Lev has uploaded a new change for review.

Change subject: utils: ssh: force close the pipe before the layered stream
......................................................................

utils: ssh: force close the pipe before the layered stream

it seems that the order of stream close within pipe chain is important.
although I could not reproduce this, got report of pseudo reproduction:

Installing Host fedora. Stage: Termination.
2013-11-20 09:51:28,129 ERROR [org.ovirt.engine.core.bll.VdsDeploy] (VdsDeploy) 
Error during deploy dialog: java.io.IOException: Pipe closed
        at java.io.PipedInputStream.read(PipedInputStream.java:308) 
[rt.jar:1.7.0_45]
        at java.io.PipedInputStream.read(PipedInputStream.java:378) 
[rt.jar:1.7.0_45]
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) 
[rt.jar:1.7.0_45]
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) 
[rt.jar:1.7.0_45]
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) 
[rt.jar:1.7.0_45]
        at java.io.InputStreamReader.read(InputStreamReader.java:184) 
[rt.jar:1.7.0_45]
        at java.io.BufferedReader.fill(BufferedReader.java:154) 
[rt.jar:1.7.0_45]
        at java.io.BufferedReader.readLine(BufferedReader.java:317) 
[rt.jar:1.7.0_45]
        at java.io.BufferedReader.readLine(BufferedReader.java:382) 
[rt.jar:1.7.0_45]
        at 
org.ovirt.otopi.dialog.MachineDialogParser.nextEvent(MachineDialogParser.java:355)
 [otopi.jar:]

This may be caused by recent try-with-resources migration in which:

        try (
            final PipedInputStream pinStdin = new PipedInputStream(BUFFER_SIZE);
            final OutputStream poutStdin = new PipedOutputStream(pinStdin);
            final PipedInputStream pinStdout = new 
PipedInputStream(BUFFER_SIZE);
            final OutputStream poutStdout = new PipedOutputStream(pinStdout);
            final ByteArrayOutputStream stderr = new 
ConstraintByteArrayOutputStream(1024);
        ) {

I suspect that the order of closing is done in unpredictable order, so
pending read on the pipe may be interrupted because of layered stream
is closed first.

I think it is incorrect to issue exception when layered stream is closed,
it should be handled as if the pipe stream is closed.

The theoretical solution is to nest another try block to force order of
closure, for example:

        try (
            final PipedInputStream pinStdin = new PipedInputStream(BUFFER_SIZE);
            final PipedInputStream pinStdout = new 
PipedInputStream(BUFFER_SIZE);
            final ByteArrayOutputStream stderr = new 
ConstraintByteArrayOutputStream(1024);
        ) {
            try (
                final OutputStream poutStdin = new PipedOutputStream(pinStdin);
                final OutputStream poutStdout = new 
PipedOutputStream(pinStdout);
            ) {

Change-Id: I485f5ae57a963b6819fdc0f3832eaa403dc0c34d
Reported-By: Piotr Kliczewski <[email protected]>
Signed-off-by: Alon Bar-Lev <[email protected]>
---
M 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHClient.java
M 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
2 files changed, 107 insertions(+), 102 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/60/21460/1

diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHClient.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHClient.java
index 189d166..2deec4b 100644
--- 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHClient.java
+++ 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHClient.java
@@ -596,43 +596,44 @@
                 localDigest
             );
             final PipedInputStream pin = new 
PipedInputStream(STREAM_BUFFER_SIZE);
-            final OutputStream pout = new PipedOutputStream(pin);
             final OutputStream dummy = new 
ConstraintByteArrayOutputStream(CONSTRAINT_BUFFER_SIZE);
             final ByteArrayOutputStream remoteDigest = new 
ConstraintByteArrayOutputStream(CONSTRAINT_BUFFER_SIZE);
         ) {
-            t = new Thread(
-                new Runnable() {
-                    @Override
-                    public void run() {
-                        try (OutputStream out = new GZIPOutputStream(pout)) {
-                            byte b[] = new byte[STREAM_BUFFER_SIZE];
-                            int n;
-                            while ((n = in.read(b)) != -1) {
-                                out.write(b, 0, n);
+            try (final OutputStream pout = new PipedOutputStream(pin)) {
+                t = new Thread(
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            try (OutputStream out = new 
GZIPOutputStream(pout)) {
+                                byte b[] = new byte[STREAM_BUFFER_SIZE];
+                                int n;
+                                while ((n = in.read(b)) != -1) {
+                                    out.write(b, 0, n);
+                                }
+                            }
+                            catch (IOException e) {
+                                log.debug("Exceution during stream 
processing", e);
                             }
                         }
-                        catch (IOException e) {
-                            log.debug("Exceution during stream processing", e);
-                        }
-                    }
-                },
-                "SSHClient.compress " + file1
-            );
-            t.start();
+                    },
+                    "SSHClient.compress " + file1
+                );
+                t.start();
 
-            executeCommand(
-                String.format(COMMAND_FILE_SEND, "gunzip -q", file2),
-                pin,
-                dummy,
-                remoteDigest
-            );
+                executeCommand(
+                    String.format(COMMAND_FILE_SEND, "gunzip -q", file2),
+                    pin,
+                    dummy,
+                    remoteDigest
+                );
 
-            t.join(THREAD_JOIN_WAIT_TIME);
-            if (t.getState() != Thread.State.TERMINATED) {
-                throw new IllegalStateException("Cannot stop SSH stream 
thread");
+                t.join(THREAD_JOIN_WAIT_TIME);
+                if (t.getState() != Thread.State.TERMINATED) {
+                    throw new IllegalStateException("Cannot stop SSH stream 
thread");
+                }
+
+                _validateDigest(localDigest, new 
String(remoteDigest.toByteArray(), Charset.forName("UTF-8")).trim());
             }
-
-            _validateDigest(localDigest, new 
String(remoteDigest.toByteArray(), Charset.forName("UTF-8")).trim());
         }
         catch(Exception e) {
             log.debug("Send failed", e);
@@ -674,7 +675,6 @@
         Thread t = null;
         try (
             final PipedOutputStream pout = new PipedOutputStream();
-            final InputStream pin = new PipedInputStream(pout, 
STREAM_BUFFER_SIZE);
             final OutputStream out = new DigestOutputStream(
                 new FileOutputStream(file2),
                 localDigest
@@ -682,40 +682,42 @@
             final InputStream empty = new ByteArrayInputStream(new byte[0]);
             final ByteArrayOutputStream remoteDigest = new 
ConstraintByteArrayOutputStream(CONSTRAINT_BUFFER_SIZE);
         ) {
-            t = new Thread(
-                new Runnable() {
-                    @Override
-                    public void run() {
-                        try (final InputStream in = new GZIPInputStream(pin)) {
+            try (final InputStream pin = new PipedInputStream(pout, 
STREAM_BUFFER_SIZE)) {
+                t = new Thread(
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            try (final InputStream in = new 
GZIPInputStream(pin)) {
 
-                            byte [] b = new byte[STREAM_BUFFER_SIZE];
-                            int n;
-                            while ((n = in.read(b)) != -1) {
-                                out.write(b, 0, n);
+                                byte [] b = new byte[STREAM_BUFFER_SIZE];
+                                int n;
+                                while ((n = in.read(b)) != -1) {
+                                    out.write(b, 0, n);
+                                }
+                            }
+                            catch (IOException e) {
+                                log.debug("Exceution during stream 
processing", e);
                             }
                         }
-                        catch (IOException e) {
-                            log.debug("Exceution during stream processing", e);
-                        }
-                    }
-                },
-                "SSHClient.decompress " + file2
-            );
-            t.start();
+                    },
+                    "SSHClient.decompress " + file2
+                );
+                t.start();
 
-            executeCommand(
-                String.format(COMMAND_FILE_RECEIVE, "gzip -q", file1),
-                empty,
-                pout,
-                remoteDigest
-            );
+                executeCommand(
+                    String.format(COMMAND_FILE_RECEIVE, "gzip -q", file1),
+                    empty,
+                    pout,
+                    remoteDigest
+                );
 
-            t.join(THREAD_JOIN_WAIT_TIME);
-            if (t.getState() != Thread.State.TERMINATED) {
-                throw new IllegalStateException("Cannot stop SSH stream 
thread");
+                t.join(THREAD_JOIN_WAIT_TIME);
+                if (t.getState() != Thread.State.TERMINATED) {
+                    throw new IllegalStateException("Cannot stop SSH stream 
thread");
+                }
+
+                _validateDigest(localDigest, new 
String(remoteDigest.toByteArray(), Charset.forName("UTF-8")).trim());
             }
-
-            _validateDigest(localDigest, new 
String(remoteDigest.toByteArray(), Charset.forName("UTF-8")).trim());
         }
         catch(Exception e) {
             log.debug("Receive failed", e);
diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
index 98959f2..3484413 100644
--- 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
+++ 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
@@ -286,60 +286,63 @@
 
         try (
             final PipedInputStream pinStdin = new 
PipedInputStream(BUFFER_SIZE);
-            final OutputStream poutStdin = new PipedOutputStream(pinStdin);
             final PipedInputStream pinStdout = new 
PipedInputStream(BUFFER_SIZE);
-            final OutputStream poutStdout = new PipedOutputStream(pinStdout);
             final ByteArrayOutputStream stderr = new 
ConstraintByteArrayOutputStream(1024);
         ) {
-            List<InputStream> stdinList;
-            if (initial == null) {
-                stdinList = new LinkedList<InputStream>();
-            }
-            else {
-                stdinList = new 
LinkedList<InputStream>(Arrays.asList(initial));
-            }
-            stdinList.add(pinStdin);
+            try (
+                final OutputStream poutStdin = new PipedOutputStream(pinStdin);
+                final OutputStream poutStdout = new 
PipedOutputStream(pinStdout);
+            ) {
+                List<InputStream> stdinList;
+                if (initial == null) {
+                    stdinList = new LinkedList<InputStream>();
+                }
+                else {
+                    stdinList = new 
LinkedList<InputStream>(Arrays.asList(initial));
+                }
+                stdinList.add(pinStdin);
 
-            sink.setControl(
-                new Control() {
-                    @Override
-                    public void close() throws IOException {
-                        if (_client != null) {
-                            _client.close();
+                sink.setControl(
+                    new Control() {
+                        @Override
+                        public void close() throws IOException {
+                            if (_client != null) {
+                                _client.close();
+                            }
                         }
                     }
-                }
-            );
-            sink.setStreams(pinStdout, poutStdin);
-            sink.start();
-
-            try {
-                _client.executeCommand(
-                    command,
-                    new 
SequenceInputStream(Collections.enumeration(stdinList)),
-                    poutStdout,
-                    stderr
                 );
-            }
-            catch (Exception e) {
-                if (stderr.size() == 0) {
-                    throw e;
-                }
+                sink.setStreams(pinStdout, poutStdin);
+                sink.start();
 
-                log.error(
-                    "Swallowing exception as preferring stderr",
-                    e
-                );
-            }
-            finally {
-                if (stderr.size() > 0) {
-                    throw new RuntimeException(
-                        String.format(
-                            "Unexpected error during execution: %1$s",
-                            new String(stderr.toByteArray(), 
Charset.forName("UTF-8"))
-                        )
+                try {
+                    _client.executeCommand(
+                        command,
+                        new 
SequenceInputStream(Collections.enumeration(stdinList)),
+                        poutStdout,
+                        stderr
                     );
                 }
+                catch (Exception e) {
+                    if (stderr.size() == 0) {
+                        throw e;
+                    }
+
+                    log.error(
+                        "Swallowing exception as preferring stderr",
+                        e
+                    );
+                }
+                finally {
+                    if (stderr.size() > 0) {
+                        throw new RuntimeException(
+                            String.format(
+                                "Unexpected error during execution: %1$s",
+                                new String(stderr.toByteArray(), 
Charset.forName("UTF-8"))
+                            )
+                        );
+                    }
+                }
             }
         }
         catch (Exception e) {


-- 
To view, visit http://gerrit.ovirt.org/21460
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I485f5ae57a963b6819fdc0f3832eaa403dc0c34d
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Alon Bar-Lev <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to