Modified: 
felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java
URL: 
http://svn.apache.org/viewvc/felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java?rev=1736001&r1=1736000&r2=1736001&view=diff
==============================================================================
--- 
felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java 
(original)
+++ 
felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java 
Mon Mar 21 16:53:48 2016
@@ -33,46 +33,81 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.felix.gogo.runtime.Parser.Executable;
 import org.apache.felix.gogo.runtime.Parser.Statement;
+import org.apache.felix.gogo.runtime.Pipe.Result;
 import org.apache.felix.service.command.Converter;
 
-public class Pipe extends Thread
+public class Pipe implements Callable<Result>
 {
-    static final ThreadLocal<Channel[]> tStreams = new 
ThreadLocal<Channel[]>();
+    private static final ThreadLocal<Pipe> CURRENT = new ThreadLocal<>();
 
-    public static Channel[] mark() {
-        return tStreams.get();
+    public static class Result {
+        public final Object result;
+        public final Exception exception;
+        public final int error;
+
+        public Result(Object result) {
+            this.result = result;
+            this.exception = null;
+            this.error = 0;
+        }
+
+        public Result(Exception exception) {
+            this.result = null;
+            this.exception = exception;
+            this.error = 1;
+        }
+
+        public Result(int error) {
+            this.result = null;
+            this.exception = null;
+            this.error = error;
+        }
+
+        public boolean isSuccess() {
+            return exception == null && error == 0;
+        }
+    }
+
+    public static Pipe getCurrentPipe() {
+        return CURRENT.get();
+    }
+
+    public static void error(int error) {
+        Pipe current = getCurrentPipe();
+        if (current != null) {
+            current.error = error;
+        }
     }
 
-    public static void reset(Channel[] streams) {
-        tStreams.set(streams);
+    private static Pipe setCurrentPipe(Pipe pipe) {
+        Pipe previous = CURRENT.get();
+        CURRENT.set(pipe);
+        return previous;
     }
 
     final Closure closure;
-    final Executable executable;
+    final Statement statement;
     final Channel[] streams;
     final boolean[] toclose;
-    Object result;
-    Exception exception;
-    int exit = 0;
+    int error;
 
-    public Pipe(Closure closure, Executable executable, Channel[] streams, 
boolean[] toclose)
+    public Pipe(Closure closure, Statement statement, Channel[] streams, 
boolean[] toclose)
     {
-        super("pipe-" + executable);
         this.closure = closure;
-        this.executable = executable;
+        this.statement = statement;
         this.streams = streams;
         this.toclose = toclose;
     }
 
     public String toString()
     {
-        return "pipe<" + executable + "> out=" + streams[1];
+        return "pipe<" + statement + "> out=" + streams[1];
     }
 
     private static final int READ = 1;
@@ -148,8 +183,8 @@ public class Pipe extends Thread
     }
 
     private static class MultiChannel<T extends Channel> implements Channel {
-        protected final List<T> channels = new ArrayList<T>();
-        protected final List<T> toClose = new ArrayList<T>();
+        protected final List<T> channels = new ArrayList<>();
+        protected final List<T> toClose = new ArrayList<>();
         protected final AtomicBoolean opened = new AtomicBoolean(true);
         public void addChannel(T channel, boolean toclose) {
             channels.add(channel);
@@ -200,14 +235,30 @@ public class Pipe extends Thread
         }
     }
 
-    public void run()
+    @Override
+    public Result call() throws Exception {
+        Thread thread = Thread.currentThread();
+        String name = thread.getName();
+        try {
+            thread.setName("pipe-" + statement);
+            return doCall();
+        } finally {
+            thread.setName(name);
+        }
+    }
+
+    public Result doCall()
     {
-        InputStream in = null;
+        InputStream in;
         PrintStream out = null;
         PrintStream err = null;
-        WritableByteChannel errChannel = (WritableByteChannel) streams[2];
 
-        Channel[] prevStreams = tStreams.get();
+        // The errChannel will be used to print errors to the error stream
+        // Before the command is actually executed (i.e. during the 
initialization,
+        // including the redirection processing), it will be the original 
error stream.
+        // This value may be modified by redirections and the redirected error 
stream
+        // will be effective just before actually running the command.
+        WritableByteChannel errChannel = (WritableByteChannel) streams[2];
 
         // TODO: not sure this is the correct way
         boolean begOfPipe = !toclose[0];
@@ -215,98 +266,101 @@ public class Pipe extends Thread
 
         try
         {
-            if (executable instanceof Statement) {
-                Statement statement = (Statement) executable;
-                List<Token> tokens = statement.redirections();
-                for (int i = 0; i < tokens.size(); i++) {
-                    Token t = tokens.get(i);
-                    Matcher m;
-                    if ((m = 
Pattern.compile("(?:([0-9])?|(&)?)>(>)?").matcher(t)).matches()) {
-                        int fd;
-                        if (m.group(1) != null) {
-                            fd = Integer.parseInt(m.group(1));
-                        }
-                        else if (m.group(2) != null) {
-                            fd = -1; // both 1 and 2
-                        } else {
-                            fd = 1;
-                        }
-                        boolean append = m.group(3) != null;
-                        Token file = tokens.get(++i);
-                        Path outPath = 
closure.session().currentDir().resolve(file.toString());
-                        Set<StandardOpenOption> options = new 
HashSet<StandardOpenOption>();
-                        options.add(StandardOpenOption.WRITE);
-                        options.add(StandardOpenOption.CREATE);
-                        if (append) {
-                            options.add(StandardOpenOption.APPEND);
-                        } else {
-                            options.add(StandardOpenOption.TRUNCATE_EXISTING);
-                        }
-                        Channel ch = Files.newByteChannel(outPath, options);
-                        if (fd >= 0) {
-                            setStream(ch, fd, WRITE, begOfPipe, endOfPipe);
-                        } else {
-                            setStream(ch, 1, WRITE, begOfPipe, endOfPipe);
-                            setStream(ch, 2, WRITE, begOfPipe, endOfPipe);
-                        }
+            List<Token> tokens = statement.redirections();
+            for (int i = 0; i < tokens.size(); i++) {
+                Token t = tokens.get(i);
+                Matcher m;
+                if ((m = 
Pattern.compile("(?:([0-9])?|(&)?)>(>)?").matcher(t)).matches()) {
+                    int fd;
+                    if (m.group(1) != null) {
+                        fd = Integer.parseInt(m.group(1));
                     }
-                    else if ((m = 
Pattern.compile("([0-9])?>&([0-9])").matcher(t)).matches()) {
-                        int fd0 = 1;
-                        if (m.group(1) != null) {
-                            fd0 = Integer.parseInt(m.group(1));
-                        }
-                        int fd1 = Integer.parseInt(m.group(2));
-                        if (streams[fd0] != null && toclose[fd0]) {
-                            streams[fd0].close();
-                        }
-                        streams[fd0] = streams[fd1];
-                        // TODO: this is wrong, we should keep a counter 
somehow so that the
-                        // stream is closed when both are closed
-                        toclose[fd0] = false;
+                    else if (m.group(2) != null) {
+                        fd = -1; // both 1 and 2
+                    } else {
+                        fd = 1;
                     }
-                    else if ((m = 
Pattern.compile("([0-9])?<(>)?").matcher(t)).matches()) {
-                        int fd = 0;
-                        if (m.group(1) != null) {
-                            fd = Integer.parseInt(m.group(1));
-                        }
-                        boolean output = m.group(2) != null;
-                        Token file = tokens.get(++i);
-                        Path inPath = 
closure.session().currentDir().resolve(file.toString());
-                        Set<StandardOpenOption> options = new 
HashSet<StandardOpenOption>();
-                        options.add(StandardOpenOption.READ);
-                        if (output) {
-                            options.add(StandardOpenOption.WRITE);
-                            options.add(StandardOpenOption.CREATE);
-                        }
-                        Channel ch = Files.newByteChannel(inPath, options);
-                        setStream(ch, fd, READ + (output ? WRITE : 0), 
begOfPipe, endOfPipe);
+                    boolean append = m.group(3) != null;
+                    Token file = tokens.get(++i);
+                    Path outPath = 
closure.session().currentDir().resolve(file.toString());
+                    Set<StandardOpenOption> options = new HashSet<>();
+                    options.add(StandardOpenOption.WRITE);
+                    options.add(StandardOpenOption.CREATE);
+                    if (append) {
+                        options.add(StandardOpenOption.APPEND);
+                    } else {
+                        options.add(StandardOpenOption.TRUNCATE_EXISTING);
+                    }
+                    Channel ch = Files.newByteChannel(outPath, options);
+                    if (fd >= 0) {
+                        setStream(ch, fd, WRITE, begOfPipe, endOfPipe);
+                    } else {
+                        setStream(ch, 1, WRITE, begOfPipe, endOfPipe);
+                        setStream(ch, 2, WRITE, begOfPipe, endOfPipe);
                     }
                 }
-            } else {
-                new UnsupportedOperationException("what to do 
?").printStackTrace();
+                else if ((m = 
Pattern.compile("([0-9])?>&([0-9])").matcher(t)).matches()) {
+                    int fd0 = 1;
+                    if (m.group(1) != null) {
+                        fd0 = Integer.parseInt(m.group(1));
+                    }
+                    int fd1 = Integer.parseInt(m.group(2));
+                    if (streams[fd0] != null && toclose[fd0]) {
+                        streams[fd0].close();
+                    }
+                    streams[fd0] = streams[fd1];
+                    // TODO: this is wrong, we should keep a counter somehow 
so that the
+                    // stream is closed when both are closed
+                    toclose[fd0] = false;
+                }
+                else if ((m = 
Pattern.compile("([0-9])?<(>)?").matcher(t)).matches()) {
+                    int fd = 0;
+                    if (m.group(1) != null) {
+                        fd = Integer.parseInt(m.group(1));
+                    }
+                    boolean output = m.group(2) != null;
+                    Token file = tokens.get(++i);
+                    Path inPath = 
closure.session().currentDir().resolve(file.toString());
+                    Set<StandardOpenOption> options = new HashSet<>();
+                    options.add(StandardOpenOption.READ);
+                    if (output) {
+                        options.add(StandardOpenOption.WRITE);
+                        options.add(StandardOpenOption.CREATE);
+                    }
+                    Channel ch = Files.newByteChannel(inPath, options);
+                    setStream(ch, fd, READ + (output ? WRITE : 0), begOfPipe, 
endOfPipe);
+                }
             }
 
-            tStreams.set(streams);
-
+            // Create streams
             in = Channels.newInputStream((ReadableByteChannel) streams[0]);
             out = new 
PrintStream(Channels.newOutputStream((WritableByteChannel) streams[1]), true);
             err = new 
PrintStream(Channels.newOutputStream((WritableByteChannel) streams[2]), true);
+            // Change the error stream to the redirected one, now that
+            // the command is about to be executed.
             errChannel = (WritableByteChannel) streams[2];
 
             closure.session().threadIO().setStreams(in, out, err);
 
-            result = closure.execute(executable);
-            // We don't print the result if toclose[1] == false, which means 
we're at the end of the pipe
-            if (result != null && !endOfPipe && 
!Boolean.FALSE.equals(closure.session().get(".FormatPipe"))) {
-                out.println(closure.session().format(result, 
Converter.INSPECT));
+            Pipe previous = setCurrentPipe(this);
+            try {
+                Object result = closure.execute(statement);
+                // If an error has been set
+                if (error != 0) {
+                    return new Result(error);
+                }
+                // We don't print the result if we're at the end of the pipe
+                if (result != null && !endOfPipe && 
!Boolean.FALSE.equals(closure.session().get(".FormatPipe"))) {
+                    out.println(closure.session().format(result, 
Converter.INSPECT));
+                }
+                return new Result(result);
+
+            } finally {
+                setCurrentPipe(previous);
             }
         }
         catch (Exception e)
         {
-            exception = e;
-            if (exit == 0) {
-                exit = 1; // failure
-            }
             // TODO: use shell name instead of 'gogo'
             // TODO: use color if not redirected
             // TODO: use conversion ?
@@ -316,6 +370,7 @@ public class Pipe extends Thread
             } catch (IOException ioe) {
                 e.addSuppressed(ioe);
             }
+            return new Result(e);
         }
         finally
         {
@@ -327,8 +382,6 @@ public class Pipe extends Thread
             }
             closure.session().threadIO().close();
 
-            tStreams.set(prevStreams);
-
             try
             {
                 for (int i = 0; i < 10; i++) {

Modified: 
felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java
URL: 
http://svn.apache.org/viewvc/felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java?rev=1736001&r1=1736000&r2=1736001&view=diff
==============================================================================
--- 
felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java
 (original)
+++ 
felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java
 Mon Mar 21 16:53:48 2016
@@ -19,7 +19,7 @@
 package org.apache.felix.service.command;
 
 import java.io.InputStream;
-import java.io.PrintStream;
+import java.io.OutputStream;
 
 /**
  * A command shell can create and maintain a number of command sessions.
@@ -59,5 +59,7 @@ public interface CommandProcessor
      * @param err The stream used for System.err
      * @return A new session.
      */
-    CommandSession createSession(InputStream in, PrintStream out, PrintStream 
err);
+    CommandSession createSession(InputStream in, OutputStream out, 
OutputStream err);
+
+    CommandSession createSession(CommandSession parent);
 }


Reply via email to