Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupAdmin.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupAdmin.java?rev=1634322&r1=1634321&r2=1634322&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupAdmin.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupAdmin.java Sun Oct 26 13:17:28 2014 @@ -1,314 +1,315 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.sun.jini.qa.harness; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.lang.reflect.Field; -import java.rmi.MarshalledObject; -import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.logging.Logger; -import java.util.logging.Level; - -/** - * An <code>Admin</code> which manages a <code>NonActivatableGroup</code>. - * The group is started in a separate VM and should be packaged in its own - * minimized JAR file to avoid class loader conflicts. - */ -public class NonActivatableGroupAdmin extends AbstractServiceAdmin - implements Admin -{ - /** the logger */ - private final static Logger logger = - Logger.getLogger("com.sun.jini.qa.harness"); - - /** the group proxy */ - private NonActivatableGroup proxy; - - /** the system process */ - private Process process; - - /** the stdout pipe, which mustn't be GC'd */ - private Pipe outPipe; - - /** service options provided by the 5-arg constructor */ - private final String[] options; - - /** service properties provided by the 5-arg constructor */ - private final String[] properties; - - /** merge of group options and service options */ - private String[] combinedOptions; - - /** merge of group properties and service properties */ - private String[] combinedProperties; - - /** - * Construct an instance of <code>NonActivatableGroupAdmin</code>. - * This constructor is called to create a group admin for a group - * which will be private to a single service. The given service - * options and properties are merged with the options and - * properties defined for the group. - * - * @param config the configuration object for this test run - * @param serviceName the name of the group managed by this admin - * @param index the instance number for this service. - * @param options the service options - * @param properties the service properties - */ - public NonActivatableGroupAdmin(QAConfig config, - String serviceName, - int index, - String[] options, - String[] properties) - { - super(config, serviceName, index); - this.options = options; - this.properties = properties; - } - - /** - * Construct an instance of <code>NonActivatableGroupAdmin</code>. - * This constructor is called to create a group which is intended - * to be shared among multiple nonactivatable services. - * - * @param config the configuration object for this test run - * @param serviceName the name of the group managed by this admin - * @param index the instance number for this service. - */ - public NonActivatableGroupAdmin(QAConfig config, - String serviceName, - int index) - { - super(config, serviceName, index); - options = new String[0]; - properties = options; - } - - /** - * Start the group managed by this admin. A command line is constructed - * based on the service properties associated with this group. A VM is - * exec'd, and an object is read from the child process - * <code>System.err</code> stream which is expected to be a - * <code>MarshalledObject</code> containing the serialized proxy for the - * <code>NonActivatableGroup</code> instance provided by that VM. No further - * input is read from the child <code>System.err.</code> It is assumed that - * the child will write any output originally destined to - * <code>System.err</code> to its <code>System.out</code> stream. - * - * @throws TestException if the child process could not be exec'd - * or if an I/O error occurs reading the - * childs proxy or if the childs proxy class - * could not be found - * @throws RemoteException never - */ - public void start() throws RemoteException, TestException { - - // construct the command line and convert to a string array - ArrayList l = new ArrayList(10); - String actCommand = null; - String vm = getServiceJVM(); - if (vm == null) { - vm = System.getProperty("java.home"); - } - l.add(vm + "/bin/java"); - l.add("-Djava.rmi.server.codebase=" + getServiceCodebase()); - l.add("-Djava.security.policy=" + getServicePolicyFile()); - String[] opts = getServiceOptions(); - if (opts != null) { - for (int i = 0; i < opts.length; i++) { - l.add(opts[i]); - } - } - String[] props = getServiceProperties(); - if (props != null) { - for (int i = 0; i < props.length; i += 2) { - l.add("-D" + props[i] + "=" + props[i+1]); - } - } - l.add("-cp"); - l.add(getServiceClasspath()); - l.add(getServiceImpl()); - String[] cmdArray = (String[]) l.toArray(new String[l.size()]); - - // stringify the command line for log display - StringBuffer cmdBuf = new StringBuffer(cmdArray[0]); - for (int i = 1; i < cmdArray.length; i++) { - cmdBuf.append(" ").append(cmdArray[i]); - } - logger.log(Level.FINER, - "NonActivatableGroup exec command line: '" + cmdBuf + "'"); - logServiceParameters(); - - ObjectInputStream proxyStream = null; - // exec the process, setup the pipe, and get the proxy - synchronized (this){ - try { - process = Runtime.getRuntime().exec(cmdArray); - outPipe = new Pipe("NonActivatableGroup_system-out", - process.getInputStream(), - System.out, - null, //filter - new NonActGrpAnnotator("NonActGrp-out: ")); - outPipe.start(); - proxyStream = new ObjectInputStream(process.getErrorStream()); - proxy = (NonActivatableGroup) - ((MarshalledObject) proxyStream.readObject()).get(); - } catch (IOException e) { - // Clean up. - process.destroy(); - try { - outPipe.stop(); - } catch (IOException ex){ }//Ignore - try { - if (proxyStream != null) proxyStream.close(); - } catch (IOException ex){ } // Ignore. - throw new TestException("NonActivatableGroupAdmin: Failed to exec " - + "the group", e); - } catch (ClassNotFoundException e) { - // Clean up. - process.destroy(); - try { - outPipe.stop(); - } catch (IOException ex){ }//Ignore - try { - if (proxyStream != null) proxyStream.close(); - } catch (IOException ex){ } // Ignore. - throw new TestException("NonActivatableGroupAdmin: Failed to exec " - + "the group", e); - } - } - } - - /** - * Annotator for annotating output merged into test log - */ - private static class NonActGrpAnnotator implements Pipe.Annotator { - - private final String annotation; - - NonActGrpAnnotator(String annotation) { - this.annotation = annotation; - } - - public String getAnnotation() { - return annotation; - } - } - - /** - * Stop the group. Start a destroy thread which has a two-second delay - * before calling <code>System.exit</code> to allow the call to return. - * - * @throws RemoteException if a communication error occurs when the - * groups <code>stop</code> method is called - */ - public synchronized void stop() throws RemoteException { - proxy.stop(); - Timeout.TimeoutHandler handler = - new Timeout.ThreadTimeoutHandler(Thread.currentThread()); - Timeout timeout = new Timeout(handler, 10000); // ten seconds - timeout.start(); - try { - process.waitFor(); - timeout.cancel(); - } catch (InterruptedException e) { - logger.log(Level.INFO, "Nonactivatable group process did not exit"); - } - } - - /** - * Return the proxy for the <code>NonActivatableGroup</code> - * managed by this admin. - * - * @return the <code>NonActivatableGroup</code> proxy - */ - public synchronized Object getProxy() { - return proxy; - } - - /** - * Override the base class method to merge options which may have - * been supplied through the constructor. - * - * @return the merged property array - */ - public synchronized String[] getServiceOptions() { - combinedOptions = config.mergeOptions(super.getServiceOptions(), - options); - return combinedOptions.clone(); - } - - /** - * Override the base class method to return the merged options. - * The <code>getServiceOptions</code> method must be called - * prior to calling this method. - * - * @return the merged options array - */ - public synchronized String[] getOptions() { - return combinedOptions.clone(); - } - - /** - * Override the base class method to merge properties which may have - * been supplied through the constructor. - * - * @return the merged property array - */ - public synchronized String[] getServiceProperties() throws TestException { - combinedProperties = config.mergeProperties(super.getServiceProperties(), - properties); - return combinedProperties.clone(); - } - - /** - * Override the base class method to return the merged properties. - * The <code>getServiceProperties</code> method must be called - * prior to calling this method. - * - * @return the merged property array - */ - public synchronized String[] getProperties() { - return combinedProperties.clone(); - } - - /** - * Attempt to force a thread dump. - * - * @return true if the dump is successfully requested - */ - public synchronized boolean forceThreadDump() { - logger.log(Level.INFO, "Attempting to force thread dump on " - + "NonActivatableGroup " + process); - boolean ret = true; - try { - Class procClass = process.getClass(); - Field field = procClass.getDeclaredField("pid"); - field.setAccessible(true); - int pid = field.getInt(process); - Process p = Runtime.getRuntime().exec("/usr/bin/kill -QUIT " + pid); - p.waitFor(); - } catch (Exception e) { - logger.log(Level.INFO, "Unable to force thread dump"); - ret = false; - } - return ret; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.qa.harness; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.lang.reflect.Field; +import java.rmi.MarshalledObject; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.logging.Logger; +import java.util.logging.Level; + +/** + * An <code>Admin</code> which manages a <code>NonActivatableGroup</code>. + * The group is started in a separate VM and should be packaged in its own + * minimized JAR file to avoid class loader conflicts. + */ +public class NonActivatableGroupAdmin extends AbstractServiceAdmin + implements Admin +{ + /** the logger */ + private final static Logger logger = + Logger.getLogger("com.sun.jini.qa.harness"); + + /** the group proxy */ + private NonActivatableGroup proxy; + + /** the system process */ + private Process process; + + /** the stdout pipe, which mustn't be GC'd */ + private Pipe outPipe; + + /** service options provided by the 5-arg constructor */ + private final String[] options; + + /** service properties provided by the 5-arg constructor */ + private final String[] properties; + + /** merge of group options and service options */ + private String[] combinedOptions; + + /** merge of group properties and service properties */ + private String[] combinedProperties; + + /** + * Construct an instance of <code>NonActivatableGroupAdmin</code>. + * This constructor is called to create a group admin for a group + * which will be private to a single service. The given service + * options and properties are merged with the options and + * properties defined for the group. + * + * @param config the configuration object for this test run + * @param serviceName the name of the group managed by this admin + * @param index the instance number for this service. + * @param options the service options + * @param properties the service properties + */ + public NonActivatableGroupAdmin(QAConfig config, + String serviceName, + int index, + String[] options, + String[] properties) + { + super(config, serviceName, index); + this.options = options; + this.properties = properties; + } + + /** + * Construct an instance of <code>NonActivatableGroupAdmin</code>. + * This constructor is called to create a group which is intended + * to be shared among multiple nonactivatable services. + * + * @param config the configuration object for this test run + * @param serviceName the name of the group managed by this admin + * @param index the instance number for this service. + */ + public NonActivatableGroupAdmin(QAConfig config, + String serviceName, + int index) + { + super(config, serviceName, index); + options = new String[0]; + properties = options; + } + + /** + * Start the group managed by this admin. A command line is constructed + * based on the service properties associated with this group. A VM is + * exec'd, and an object is read from the child process + * <code>System.err</code> stream which is expected to be a + * <code>MarshalledObject</code> containing the serialized proxy for the + * <code>NonActivatableGroup</code> instance provided by that VM. No further + * input is read from the child <code>System.err.</code> It is assumed that + * the child will write any output originally destined to + * <code>System.err</code> to its <code>System.out</code> stream. + * + * @throws TestException if the child process could not be exec'd + * or if an I/O error occurs reading the + * childs proxy or if the childs proxy class + * could not be found + * @throws RemoteException never + */ + public void start() throws RemoteException, TestException { + + // construct the command line and convert to a string array + ArrayList l = new ArrayList(10); + String actCommand = null; + String vm = getServiceJVM(); + if (vm == null) { + vm = System.getProperty("java.home"); + } + l.add(vm + "/bin/java"); + l.add("-Djava.rmi.server.codebase=" + getServiceCodebase()); + l.add("-Djava.security.policy=" + getServicePolicyFile()); + String[] opts = getServiceOptions(); + if (opts != null) { + for (int i = 0; i < opts.length; i++) { + l.add(opts[i]); + } + } + String[] props = getServiceProperties(); + if (props != null) { + for (int i = 0; i < props.length; i += 2) { + l.add("-D" + props[i] + "=" + props[i+1]); + } + } + l.add("-cp"); + l.add(getServiceClasspath()); + l.add(getServiceImpl()); + String[] cmdArray = (String[]) l.toArray(new String[l.size()]); + + // stringify the command line for log display + StringBuffer cmdBuf = new StringBuffer(cmdArray[0]); + for (int i = 1; i < cmdArray.length; i++) { + cmdBuf.append(" ").append(cmdArray[i]); + } + logger.log(Level.FINER, + "NonActivatableGroup exec command line: '" + cmdBuf + "'"); + logServiceParameters(); + + ObjectInputStream proxyStream = null; + // exec the process, setup the pipe, and get the proxy + synchronized (this){ + try { + process = Runtime.getRuntime().exec(cmdArray); + outPipe = new Pipe("NonActivatableGroup_system-out", + process.getInputStream(), + System.out, + null, //filter + new NonActGrpAnnotator("NonActGrp-out: ")); + outPipe.start(); + proxyStream = new ObjectInputStream(process.getErrorStream()); + proxy = (NonActivatableGroup) + ((MarshalledObject) proxyStream.readObject()).get(); + } catch (IOException e) { + // Clean up. + process.destroy(); + try { + outPipe.stop(); + } catch (IOException ex){ }//Ignore + try { + if (proxyStream != null) proxyStream.close(); + } catch (IOException ex){ } // Ignore. + throw new TestException("NonActivatableGroupAdmin: Failed to exec " + + "the group", e); + } catch (ClassNotFoundException e) { + // Clean up. + process.destroy(); + try { + outPipe.stop(); + } catch (IOException ex){ }//Ignore + try { + if (proxyStream != null) proxyStream.close(); + } catch (IOException ex){ } // Ignore. + throw new TestException("NonActivatableGroupAdmin: Failed to exec " + + "the group", e); + } + } + } + + /** + * Annotator for annotating output merged into test log + */ + private static class NonActGrpAnnotator implements Pipe.Annotator { + + private final String annotation; + + NonActGrpAnnotator(String annotation) { + this.annotation = annotation; + } + + public String getAnnotation() { + return annotation; + } + } + + /** + * Stop the group. Start a destroy thread which has a two-second delay + * before calling <code>System.exit</code> to allow the call to return. + * + * @throws RemoteException if a communication error occurs when the + * groups <code>stop</code> method is called + */ + public synchronized void stop() throws RemoteException { + proxy.stop(); + Timeout.TimeoutHandler handler = + new Timeout.ThreadTimeoutHandler(Thread.currentThread()); + Timeout timeout = new Timeout(handler, 10000); // ten seconds + timeout.start(); + try { + process.waitFor(); + timeout.cancel(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.log(Level.INFO, "Nonactivatable group process did not exit"); + } + } + + /** + * Return the proxy for the <code>NonActivatableGroup</code> + * managed by this admin. + * + * @return the <code>NonActivatableGroup</code> proxy + */ + public synchronized Object getProxy() { + return proxy; + } + + /** + * Override the base class method to merge options which may have + * been supplied through the constructor. + * + * @return the merged property array + */ + public synchronized String[] getServiceOptions() { + combinedOptions = config.mergeOptions(super.getServiceOptions(), + options); + return combinedOptions.clone(); + } + + /** + * Override the base class method to return the merged options. + * The <code>getServiceOptions</code> method must be called + * prior to calling this method. + * + * @return the merged options array + */ + public synchronized String[] getOptions() { + return combinedOptions.clone(); + } + + /** + * Override the base class method to merge properties which may have + * been supplied through the constructor. + * + * @return the merged property array + */ + public synchronized String[] getServiceProperties() throws TestException { + combinedProperties = config.mergeProperties(super.getServiceProperties(), + properties); + return combinedProperties.clone(); + } + + /** + * Override the base class method to return the merged properties. + * The <code>getServiceProperties</code> method must be called + * prior to calling this method. + * + * @return the merged property array + */ + public synchronized String[] getProperties() { + return combinedProperties.clone(); + } + + /** + * Attempt to force a thread dump. + * + * @return true if the dump is successfully requested + */ + public synchronized boolean forceThreadDump() { + logger.log(Level.INFO, "Attempting to force thread dump on " + + "NonActivatableGroup " + process); + boolean ret = true; + try { + Class procClass = process.getClass(); + Field field = procClass.getDeclaredField("pid"); + field.setAccessible(true); + int pid = field.getInt(process); + Process p = Runtime.getRuntime().exec("/usr/bin/kill -QUIT " + pid); + p.waitFor(); + } catch (Exception e) { + logger.log(Level.INFO, "Unable to force thread dump"); + ret = false; + } + return ret; + } +}
Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/Pipe.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/Pipe.java?rev=1634322&r1=1634321&r2=1634322&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/Pipe.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/Pipe.java Sun Oct 26 13:17:28 2014 @@ -1,236 +1,237 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.sun.jini.qa.harness; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PrintStream; - -/** - * An I/O redirection pipe. A daemon thread copies data from an input - * stream to an output stream. An optional annotator may be provided - * which will prefix each line of the copied data with a label which - * can be used to identify the source. An optional filter may be provided - * which is called to process each byte of the input stream. If no - * annotator is provided, input is copied to the output stream immediately. - * If an annotator is supplied, lines are buffered. Therefore, if an - * annotator is supplied, the input and output streams MUST have the - * same line separator convention; otherwise it is possible for the - * end-of-line to never be detected. - */ -class Pipe implements Runnable { - - /** the line separator string */ - private final static String SEPARATOR = "\n"; - - /** most recent input bytes for comparison with lineSeparator.*/ - private final byte[] lastBytes = new byte[SEPARATOR.length()]; - - /** output line buffer */ - private final ByteArrayOutputStream bufOut = new ByteArrayOutputStream();; - - /** the input stream */ - private final InputStream in; - - /** the output PrintStream */ - private volatile PrintStream stream; - - /** the input data filter */ - private final Filter filter; - - /** the output stream annotator */ - private final Annotator annotator; - - /** the thread to process the data */ - private final Thread outThread; - - /** - * Create a new Pipe object and start the thread to handle the data. - * - * @param name the name to assign to the thread - * @param in input stream from which pipe input flows - * @param stream the stream to which output will be sent - * @param f the filter for processing input characters - * @param a the annotator for prepending text to logged lines - */ - Pipe(String name, - InputStream in, - PrintStream stream, - Filter f, - Annotator a) - { - this.in = in; - this.stream = stream; - this.filter = (f == null ? new NullFilter() : f); - this.annotator = a; - outThread = new Thread(this, name); - outThread.setDaemon(true); - //outThread.start(); - } - - void start(){ - outThread.start(); - } - - void stop() throws IOException{ - outThread.interrupt(); - in.close(); - stream.close(); - } - - /** - * Wait until the run method terminates due to reading EOF on input - * - * @param timeout max time to wait for the thread to terminate - */ - void waitTillEmpty(int timeout) { - try { - outThread.join(timeout); - } catch (InterruptedException ignore) { - } - } - - /** - * Set the output stream. - * - * @param stream the stream - */ - void setStream(PrintStream stream) { - this.stream = stream; - } - - /** - * Read and write data until EOF is detected. Flush any remaining data to - * the output steam and return, terminating the thread. - */ - public void run() { - byte[] buf = new byte[256]; - int count; - try { - /* read bytes till there are no more. */ - while ((count = in.read(buf)) != -1) { - write(buf, count); - } - - /* If annotating, flush internal buffer... may not have ended on a - * line separator, we also need a last annotation if - * something was left. - */ - String lastInBuffer = bufOut.toString(); - bufOut.reset(); - if (lastInBuffer.length() > 0) { - if (annotator != null) { - stream.print(annotator.getAnnotation()); - } - stream.println(lastInBuffer); - } - // Silently ignore exceptions. Child VM's which are killed - // can generate uninteresting noise otherwise. - } catch (Exception e) { - } - } - - /** - * For each byte in the give byte array, pass the byte to the - * filter and then call the <code>write(byte)</code> method to - * output the filtered bytes. - * - * @param b the array of input bytes - * @param len the number data bytes in the array - */ - private void write(byte b[], int len) throws IOException { - if (len < 0) { - throw new ArrayIndexOutOfBoundsException(len); - } - for (int i = 0; i < len; i++) { - byte[] fb = filter.filterInput(b[i]); - for (int j = 0; j < fb.length; j++) { - write(fb[j]); - } - } - } - - /** - * If not annotated, write the byte to the stream immediately. Otherwise, - * write a byte of data to the internal buffer. If we have matched a line - * separator, then the currently buffered line is sent to the output writer - * with a prepended annotation string. - */ - private void write(byte b) throws IOException { - - bufOut.write(b); - - // shift previous bytes 'left' and append new byte - int i = 1; - while (i < lastBytes.length) { - lastBytes[i-1] = lastBytes[i++]; - } - lastBytes[i-1] = b; - - // write buffered line if line separator detected - if (SEPARATOR.equals(new String(lastBytes))) { - - String s = bufOut.toString(); - bufOut.reset(); - if (annotator != null) { - stream.print(annotator.getAnnotation()); - } - stream.print(s); - } - } - - /** - * A filter for the input stream. - */ - static interface Filter { - - /** - * Filters a byte. The given <code>byte</code> is - * processed, and a filtered array is returned. The return value - * may be zero length, but must not be null. - * - * @param b the <code>byte</code> to process in the filter - * @return a non-null array of <code>byte</code>s - */ - public byte[] filterInput(byte b); - } - - /** - * An annotator for the output stream. - */ - static interface Annotator { - - /** - * Return the annotation. The returned string is prepended - * to each line of output. - * - * @return the output annotation string - */ - public String getAnnotation(); - } - - /** A default implementation of the <code>Filter</code> interface */ - private static class NullFilter implements Filter { - public byte[] filterInput(byte b) { - return new byte[]{b}; - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sun.jini.qa.harness; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +/** + * An I/O redirection pipe. A daemon thread copies data from an input + * stream to an output stream. An optional annotator may be provided + * which will prefix each line of the copied data with a label which + * can be used to identify the source. An optional filter may be provided + * which is called to process each byte of the input stream. If no + * annotator is provided, input is copied to the output stream immediately. + * If an annotator is supplied, lines are buffered. Therefore, if an + * annotator is supplied, the input and output streams MUST have the + * same line separator convention; otherwise it is possible for the + * end-of-line to never be detected. + */ +class Pipe implements Runnable { + + /** the line separator string */ + private final static String SEPARATOR = "\n"; + + /** most recent input bytes for comparison with lineSeparator.*/ + private final byte[] lastBytes = new byte[SEPARATOR.length()]; + + /** output line buffer */ + private final ByteArrayOutputStream bufOut = new ByteArrayOutputStream();; + + /** the input stream */ + private final InputStream in; + + /** the output PrintStream */ + private volatile PrintStream stream; + + /** the input data filter */ + private final Filter filter; + + /** the output stream annotator */ + private final Annotator annotator; + + /** the thread to process the data */ + private final Thread outThread; + + /** + * Create a new Pipe object and start the thread to handle the data. + * + * @param name the name to assign to the thread + * @param in input stream from which pipe input flows + * @param stream the stream to which output will be sent + * @param f the filter for processing input characters + * @param a the annotator for prepending text to logged lines + */ + Pipe(String name, + InputStream in, + PrintStream stream, + Filter f, + Annotator a) + { + this.in = in; + this.stream = stream; + this.filter = (f == null ? new NullFilter() : f); + this.annotator = a; + outThread = new Thread(this, name); + outThread.setDaemon(true); + //outThread.start(); + } + + void start(){ + outThread.start(); + } + + void stop() throws IOException{ + outThread.interrupt(); + in.close(); + stream.close(); + } + + /** + * Wait until the run method terminates due to reading EOF on input + * + * @param timeout max time to wait for the thread to terminate + */ + void waitTillEmpty(int timeout) { + try { + outThread.join(timeout); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + + /** + * Set the output stream. + * + * @param stream the stream + */ + void setStream(PrintStream stream) { + this.stream = stream; + } + + /** + * Read and write data until EOF is detected. Flush any remaining data to + * the output steam and return, terminating the thread. + */ + public void run() { + byte[] buf = new byte[256]; + int count; + try { + /* read bytes till there are no more. */ + while ((count = in.read(buf)) != -1) { + write(buf, count); + } + + /* If annotating, flush internal buffer... may not have ended on a + * line separator, we also need a last annotation if + * something was left. + */ + String lastInBuffer = bufOut.toString(); + bufOut.reset(); + if (lastInBuffer.length() > 0) { + if (annotator != null) { + stream.print(annotator.getAnnotation()); + } + stream.println(lastInBuffer); + } + // Silently ignore exceptions. Child VM's which are killed + // can generate uninteresting noise otherwise. + } catch (Exception e) { + } + } + + /** + * For each byte in the give byte array, pass the byte to the + * filter and then call the <code>write(byte)</code> method to + * output the filtered bytes. + * + * @param b the array of input bytes + * @param len the number data bytes in the array + */ + private void write(byte b[], int len) throws IOException { + if (len < 0) { + throw new ArrayIndexOutOfBoundsException(len); + } + for (int i = 0; i < len; i++) { + byte[] fb = filter.filterInput(b[i]); + for (int j = 0; j < fb.length; j++) { + write(fb[j]); + } + } + } + + /** + * If not annotated, write the byte to the stream immediately. Otherwise, + * write a byte of data to the internal buffer. If we have matched a line + * separator, then the currently buffered line is sent to the output writer + * with a prepended annotation string. + */ + private void write(byte b) throws IOException { + + bufOut.write(b); + + // shift previous bytes 'left' and append new byte + int i = 1; + while (i < lastBytes.length) { + lastBytes[i-1] = lastBytes[i++]; + } + lastBytes[i-1] = b; + + // write buffered line if line separator detected + if (SEPARATOR.equals(new String(lastBytes))) { + + String s = bufOut.toString(); + bufOut.reset(); + if (annotator != null) { + stream.print(annotator.getAnnotation()); + } + stream.print(s); + } + } + + /** + * A filter for the input stream. + */ + static interface Filter { + + /** + * Filters a byte. The given <code>byte</code> is + * processed, and a filtered array is returned. The return value + * may be zero length, but must not be null. + * + * @param b the <code>byte</code> to process in the filter + * @return a non-null array of <code>byte</code>s + */ + public byte[] filterInput(byte b); + } + + /** + * An annotator for the output stream. + */ + static interface Annotator { + + /** + * Return the annotation. The returned string is prepended + * to each line of output. + * + * @return the output annotation string + */ + public String getAnnotation(); + } + + /** A default implementation of the <code>Filter</code> interface */ + private static class NullFilter implements Filter { + public byte[] filterInput(byte b) { + return new byte[]{b}; + } + } +}
