Modified: felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java URL: http://svn.apache.org/viewvc/felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java?rev=1736001&r1=1736000&r2=1736001&view=diff ============================================================================== --- felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java (original) +++ felix/trunk/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/Pipe.java Mon Mar 21 16:53:48 2016 @@ -33,46 +33,81 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.felix.gogo.runtime.Parser.Executable; import org.apache.felix.gogo.runtime.Parser.Statement; +import org.apache.felix.gogo.runtime.Pipe.Result; import org.apache.felix.service.command.Converter; -public class Pipe extends Thread +public class Pipe implements Callable<Result> { - static final ThreadLocal<Channel[]> tStreams = new ThreadLocal<Channel[]>(); + private static final ThreadLocal<Pipe> CURRENT = new ThreadLocal<>(); - public static Channel[] mark() { - return tStreams.get(); + public static class Result { + public final Object result; + public final Exception exception; + public final int error; + + public Result(Object result) { + this.result = result; + this.exception = null; + this.error = 0; + } + + public Result(Exception exception) { + this.result = null; + this.exception = exception; + this.error = 1; + } + + public Result(int error) { + this.result = null; + this.exception = null; + this.error = error; + } + + public boolean isSuccess() { + return exception == null && error == 0; + } + } + + public static Pipe getCurrentPipe() { + return CURRENT.get(); + } + + public static void error(int error) { + Pipe current = getCurrentPipe(); + if (current != null) { + current.error = error; + } } - public static void reset(Channel[] streams) { - tStreams.set(streams); + private static Pipe setCurrentPipe(Pipe pipe) { + Pipe previous = CURRENT.get(); + CURRENT.set(pipe); + return previous; } final Closure closure; - final Executable executable; + final Statement statement; final Channel[] streams; final boolean[] toclose; - Object result; - Exception exception; - int exit = 0; + int error; - public Pipe(Closure closure, Executable executable, Channel[] streams, boolean[] toclose) + public Pipe(Closure closure, Statement statement, Channel[] streams, boolean[] toclose) { - super("pipe-" + executable); this.closure = closure; - this.executable = executable; + this.statement = statement; this.streams = streams; this.toclose = toclose; } public String toString() { - return "pipe<" + executable + "> out=" + streams[1]; + return "pipe<" + statement + "> out=" + streams[1]; } private static final int READ = 1; @@ -148,8 +183,8 @@ public class Pipe extends Thread } private static class MultiChannel<T extends Channel> implements Channel { - protected final List<T> channels = new ArrayList<T>(); - protected final List<T> toClose = new ArrayList<T>(); + protected final List<T> channels = new ArrayList<>(); + protected final List<T> toClose = new ArrayList<>(); protected final AtomicBoolean opened = new AtomicBoolean(true); public void addChannel(T channel, boolean toclose) { channels.add(channel); @@ -200,14 +235,30 @@ public class Pipe extends Thread } } - public void run() + @Override + public Result call() throws Exception { + Thread thread = Thread.currentThread(); + String name = thread.getName(); + try { + thread.setName("pipe-" + statement); + return doCall(); + } finally { + thread.setName(name); + } + } + + public Result doCall() { - InputStream in = null; + InputStream in; PrintStream out = null; PrintStream err = null; - WritableByteChannel errChannel = (WritableByteChannel) streams[2]; - Channel[] prevStreams = tStreams.get(); + // The errChannel will be used to print errors to the error stream + // Before the command is actually executed (i.e. during the initialization, + // including the redirection processing), it will be the original error stream. + // This value may be modified by redirections and the redirected error stream + // will be effective just before actually running the command. + WritableByteChannel errChannel = (WritableByteChannel) streams[2]; // TODO: not sure this is the correct way boolean begOfPipe = !toclose[0]; @@ -215,98 +266,101 @@ public class Pipe extends Thread try { - if (executable instanceof Statement) { - Statement statement = (Statement) executable; - List<Token> tokens = statement.redirections(); - for (int i = 0; i < tokens.size(); i++) { - Token t = tokens.get(i); - Matcher m; - if ((m = Pattern.compile("(?:([0-9])?|(&)?)>(>)?").matcher(t)).matches()) { - int fd; - if (m.group(1) != null) { - fd = Integer.parseInt(m.group(1)); - } - else if (m.group(2) != null) { - fd = -1; // both 1 and 2 - } else { - fd = 1; - } - boolean append = m.group(3) != null; - Token file = tokens.get(++i); - Path outPath = closure.session().currentDir().resolve(file.toString()); - Set<StandardOpenOption> options = new HashSet<StandardOpenOption>(); - options.add(StandardOpenOption.WRITE); - options.add(StandardOpenOption.CREATE); - if (append) { - options.add(StandardOpenOption.APPEND); - } else { - options.add(StandardOpenOption.TRUNCATE_EXISTING); - } - Channel ch = Files.newByteChannel(outPath, options); - if (fd >= 0) { - setStream(ch, fd, WRITE, begOfPipe, endOfPipe); - } else { - setStream(ch, 1, WRITE, begOfPipe, endOfPipe); - setStream(ch, 2, WRITE, begOfPipe, endOfPipe); - } + List<Token> tokens = statement.redirections(); + for (int i = 0; i < tokens.size(); i++) { + Token t = tokens.get(i); + Matcher m; + if ((m = Pattern.compile("(?:([0-9])?|(&)?)>(>)?").matcher(t)).matches()) { + int fd; + if (m.group(1) != null) { + fd = Integer.parseInt(m.group(1)); } - else if ((m = Pattern.compile("([0-9])?>&([0-9])").matcher(t)).matches()) { - int fd0 = 1; - if (m.group(1) != null) { - fd0 = Integer.parseInt(m.group(1)); - } - int fd1 = Integer.parseInt(m.group(2)); - if (streams[fd0] != null && toclose[fd0]) { - streams[fd0].close(); - } - streams[fd0] = streams[fd1]; - // TODO: this is wrong, we should keep a counter somehow so that the - // stream is closed when both are closed - toclose[fd0] = false; + else if (m.group(2) != null) { + fd = -1; // both 1 and 2 + } else { + fd = 1; } - else if ((m = Pattern.compile("([0-9])?<(>)?").matcher(t)).matches()) { - int fd = 0; - if (m.group(1) != null) { - fd = Integer.parseInt(m.group(1)); - } - boolean output = m.group(2) != null; - Token file = tokens.get(++i); - Path inPath = closure.session().currentDir().resolve(file.toString()); - Set<StandardOpenOption> options = new HashSet<StandardOpenOption>(); - options.add(StandardOpenOption.READ); - if (output) { - options.add(StandardOpenOption.WRITE); - options.add(StandardOpenOption.CREATE); - } - Channel ch = Files.newByteChannel(inPath, options); - setStream(ch, fd, READ + (output ? WRITE : 0), begOfPipe, endOfPipe); + boolean append = m.group(3) != null; + Token file = tokens.get(++i); + Path outPath = closure.session().currentDir().resolve(file.toString()); + Set<StandardOpenOption> options = new HashSet<>(); + options.add(StandardOpenOption.WRITE); + options.add(StandardOpenOption.CREATE); + if (append) { + options.add(StandardOpenOption.APPEND); + } else { + options.add(StandardOpenOption.TRUNCATE_EXISTING); + } + Channel ch = Files.newByteChannel(outPath, options); + if (fd >= 0) { + setStream(ch, fd, WRITE, begOfPipe, endOfPipe); + } else { + setStream(ch, 1, WRITE, begOfPipe, endOfPipe); + setStream(ch, 2, WRITE, begOfPipe, endOfPipe); } } - } else { - new UnsupportedOperationException("what to do ?").printStackTrace(); + else if ((m = Pattern.compile("([0-9])?>&([0-9])").matcher(t)).matches()) { + int fd0 = 1; + if (m.group(1) != null) { + fd0 = Integer.parseInt(m.group(1)); + } + int fd1 = Integer.parseInt(m.group(2)); + if (streams[fd0] != null && toclose[fd0]) { + streams[fd0].close(); + } + streams[fd0] = streams[fd1]; + // TODO: this is wrong, we should keep a counter somehow so that the + // stream is closed when both are closed + toclose[fd0] = false; + } + else if ((m = Pattern.compile("([0-9])?<(>)?").matcher(t)).matches()) { + int fd = 0; + if (m.group(1) != null) { + fd = Integer.parseInt(m.group(1)); + } + boolean output = m.group(2) != null; + Token file = tokens.get(++i); + Path inPath = closure.session().currentDir().resolve(file.toString()); + Set<StandardOpenOption> options = new HashSet<>(); + options.add(StandardOpenOption.READ); + if (output) { + options.add(StandardOpenOption.WRITE); + options.add(StandardOpenOption.CREATE); + } + Channel ch = Files.newByteChannel(inPath, options); + setStream(ch, fd, READ + (output ? WRITE : 0), begOfPipe, endOfPipe); + } } - tStreams.set(streams); - + // Create streams in = Channels.newInputStream((ReadableByteChannel) streams[0]); out = new PrintStream(Channels.newOutputStream((WritableByteChannel) streams[1]), true); err = new PrintStream(Channels.newOutputStream((WritableByteChannel) streams[2]), true); + // Change the error stream to the redirected one, now that + // the command is about to be executed. errChannel = (WritableByteChannel) streams[2]; closure.session().threadIO().setStreams(in, out, err); - result = closure.execute(executable); - // We don't print the result if toclose[1] == false, which means we're at the end of the pipe - if (result != null && !endOfPipe && !Boolean.FALSE.equals(closure.session().get(".FormatPipe"))) { - out.println(closure.session().format(result, Converter.INSPECT)); + Pipe previous = setCurrentPipe(this); + try { + Object result = closure.execute(statement); + // If an error has been set + if (error != 0) { + return new Result(error); + } + // We don't print the result if we're at the end of the pipe + if (result != null && !endOfPipe && !Boolean.FALSE.equals(closure.session().get(".FormatPipe"))) { + out.println(closure.session().format(result, Converter.INSPECT)); + } + return new Result(result); + + } finally { + setCurrentPipe(previous); } } catch (Exception e) { - exception = e; - if (exit == 0) { - exit = 1; // failure - } // TODO: use shell name instead of 'gogo' // TODO: use color if not redirected // TODO: use conversion ? @@ -316,6 +370,7 @@ public class Pipe extends Thread } catch (IOException ioe) { e.addSuppressed(ioe); } + return new Result(e); } finally { @@ -327,8 +382,6 @@ public class Pipe extends Thread } closure.session().threadIO().close(); - tStreams.set(prevStreams); - try { for (int i = 0; i < 10; i++) {
Modified: felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java URL: http://svn.apache.org/viewvc/felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java?rev=1736001&r1=1736000&r2=1736001&view=diff ============================================================================== --- felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java (original) +++ felix/trunk/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandProcessor.java Mon Mar 21 16:53:48 2016 @@ -19,7 +19,7 @@ package org.apache.felix.service.command; import java.io.InputStream; -import java.io.PrintStream; +import java.io.OutputStream; /** * A command shell can create and maintain a number of command sessions. @@ -59,5 +59,7 @@ public interface CommandProcessor * @param err The stream used for System.err * @return A new session. */ - CommandSession createSession(InputStream in, PrintStream out, PrintStream err); + CommandSession createSession(InputStream in, OutputStream out, OutputStream err); + + CommandSession createSession(CommandSession parent); }
