http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/main/java/org/qi4j/io/Inputs.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Inputs.java b/core/io/src/main/java/org/qi4j/io/Inputs.java deleted file mode 100644 index eb2001f..0000000 --- a/core/io/src/main/java/org/qi4j/io/Inputs.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.qi4j.io; - -import java.io.BufferedOutputStream; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.Reader; -import java.net.URL; -import java.net.URLConnection; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.Scanner; -import java.util.zip.GZIPInputStream; -import org.qi4j.functional.Visitor; - -/** - * Common inputs - */ -public class Inputs -{ - // START SNIPPET: method - - /** - * Read lines from a String. - * - * @param source lines - * - * @return Input that provides lines from the string as strings - */ - public static Input<String, RuntimeException> text( final String source ) - // END SNIPPET: method - { - return new Input<String, RuntimeException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) - throws RuntimeException, ReceiverThrowableType - { - - output.receiveFrom( new Sender<String, RuntimeException>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, RuntimeException - { - Scanner scanner = new Scanner( source ); - while( scanner.hasNextLine() ) - { - receiver.receive( scanner.nextLine() ); - } - } - } ); - } - }; - } - - // START SNIPPET: method - - /** - * Read lines from a Reader. - * - * @param source lines - * - * @return Input that provides lines from the string as strings - */ - public static Input<String, RuntimeException> text( final Reader source ) - // END SNIPPET: method - { - return new Input<String, RuntimeException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) - throws RuntimeException, ReceiverThrowableType - { - - output.receiveFrom( new Sender<String, RuntimeException>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, RuntimeException - { - Scanner scanner = new Scanner( source ); - while( scanner.hasNextLine() ) - { - receiver.receive( scanner.nextLine() ); - } - } - } ); - } - }; - } - - // START SNIPPET: method - - /** - * Read lines from a UTF-8 encoded textfile. - * - * If the filename ends with .gz, then the data is automatically unzipped when read. - * - * @param source textfile with lines separated by \n character - * - * @return Input that provides lines from the textfiles as strings - */ - public static Input<String, IOException> text( final File source ) - // END SNIPPET: method - { - return text( source, "UTF-8" ); - } - - // START SNIPPET: method - - /** - * Read lines from a textfile with the given encoding. - * - * If the filename ends with .gz, then the data is automatically unzipped when read. - * - * @param source textfile with lines separated by \n character - * @param encoding encoding of file, e.g. "UTF-8" - * - * @return Input that provides lines from the textfiles as strings - */ - public static Input<String, IOException> text( final File source, final String encoding ) - // END SNIPPET: method - { - return new Input<String, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - InputStream stream = new FileInputStream( source ); - - // If file is gzipped, unzip it automatically - if( source.getName().endsWith( ".gz" ) ) - { - stream = new GZIPInputStream( stream ); - } - - try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, encoding ) )) - { - output.receiveFrom( new Sender<String, IOException>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, IOException - { - String line; - while( ( line = reader.readLine() ) != null ) - { - receiver.receive( line ); - } - } - } ); - } - } - }; - } - - // START SNIPPET: method - - /** - * Read lines from a textfile at a given URL. - * - * If the content support gzip encoding, then the data is automatically unzipped when read. - * - * The charset in the content-type of the URL will be used for parsing. Default is UTF-8. - * - * @param source textfile with lines separated by \n character - * - * @return Input that provides lines from the textfiles as strings - */ - public static Input<String, IOException> text( final URL source ) - // END SNIPPET: method - { - return new Input<String, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - URLConnection urlConnection = source.openConnection(); - urlConnection.setRequestProperty( "Accept-Encoding", "gzip" ); - InputStream stream = urlConnection.getInputStream(); - - // If file is gzipped, unzip it automatically - if( "gzip".equals( urlConnection.getContentEncoding() ) ) - { - stream = new GZIPInputStream( stream ); - } - - // Figure out charset given content-type - String contentType = urlConnection.getContentType(); - String charSet = "UTF-8"; - if( contentType.contains( "charset=" ) ) - { - charSet = contentType.substring( contentType.indexOf( "charset=" ) + "charset=".length() ); - } - - try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, charSet ) )) - { - output.receiveFrom( new Sender<String, IOException>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, IOException - { - String line; - while( ( line = reader.readLine() ) != null ) - { - receiver.receive( line ); - } - } - } ); - } - } - }; - } - - // START SNIPPET: method - - /** - * Read a file using ByteBuffer of a given size. Useful for transferring raw data. - * - * @param source The file to be read. - * @param bufferSize The size of the byte array. - * - * @return An Input instance to be applied to streaming operations. - */ - public static Input<ByteBuffer, IOException> byteBuffer( final File source, final int bufferSize ) - // END SNIPPET: method - { - return new Input<ByteBuffer, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - final FileInputStream stream = new FileInputStream( source ); - final FileChannel fci = stream.getChannel(); - - final ByteBuffer buffer = ByteBuffer.allocate( bufferSize ); - - try - { - output.receiveFrom( new Sender<ByteBuffer, IOException>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, IOException - { - while( fci.read( buffer ) != -1 ) - { - buffer.flip(); - receiver.receive( buffer ); - buffer.clear(); - } - } - } ); - } - finally - { - stream.close(); - } - } - }; - } - - // START SNIPPET: method - - /** - * Read an inputstream using ByteBuffer of a given size. - * - * @param source The InputStream to be read. - * @param bufferSize The size of the byte array. - * - * @return An Input instance to be applied to streaming operations. - */ - public static Input<ByteBuffer, IOException> byteBuffer( final InputStream source, final int bufferSize ) - // END SNIPPET: method - { - return new Input<ByteBuffer, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - try - { - output.receiveFrom( new Sender<ByteBuffer, IOException>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, IOException - { - byte[] buffer = new byte[ bufferSize ]; - - int len; - while( ( len = source.read( buffer ) ) != -1 ) - { - ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, 0, len ); - receiver.receive( byteBuffer ); - } - } - } ); - } - finally - { - source.close(); - } - } - }; - } - - // START SNIPPET: method - - /** - * Combine many Input into one single Input. When a transfer is initiated from it all items from all inputs will be transferred - * to the given Output. - * - * @param inputs An Iterable of Input instances to be combined. - * @param <T> The item type of the Input - * @param <SenderThrowableType> The Throwable that might be thrown by the Inputs. - * - * @return A combined Input, allowing for easy aggregation of many Input sources. - */ - public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> combine( final Iterable<Input<T, SenderThrowableType>> inputs ) - // END SNIPPET: method - { - return new Input<T, SenderThrowableType>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void transferTo( Output<? super T, Receiver2ThrowableType> output ) - throws SenderThrowableType, Receiver2ThrowableType - { - output.receiveFrom( new Sender<T, SenderThrowableType>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, SenderThrowableType - { - for( Input<T, SenderThrowableType> input : inputs ) - { - input.transferTo( new Output<T, ReceiverThrowableType>() - { - @Override - public <Sender2ThrowableType extends Throwable> void receiveFrom( Sender<? extends T, Sender2ThrowableType> sender ) - throws ReceiverThrowableType, Sender2ThrowableType - { - sender.sendTo( new Receiver<T, ReceiverThrowableType>() - { - @Override - public void receive( T item ) - throws ReceiverThrowableType - { - receiver.receive( item ); - } - } ); - } - } ); - } - } - } ); - } - }; - } - - // START SNIPPET: method - - /** - * Create an Input that takes its items from the given Iterable. - * - * @param iterable The Iterable to be used as an Input. - * @param <T> The item type of the Input - * - * @return An Input instance that is backed by the Iterable. - */ - public static <T> Input<T, RuntimeException> iterable( final Iterable<T> iterable ) - // END SNIPPET: method - { - return new Input<T, RuntimeException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) - throws RuntimeException, ReceiverThrowableType - { - output.receiveFrom( new Sender<T, RuntimeException>() - { - @Override - public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super T, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, RuntimeException - { - for( T item : iterable ) - { - receiver.receive( item ); - } - } - } ); - } - }; - } - - // START SNIPPET: method - - /** - * Create an Input that allows a Visitor to write to an OutputStream. The stream is a BufferedOutputStream, so when enough - * data has been gathered it will send this in chunks of the given size to the Output it is transferred to. The Visitor does not have to call - * close() on the OutputStream, but should ensure that any wrapper streams or writers are flushed so that all data is sent. - * - * @param outputVisitor The OutputStream Visitor that will be backing the Input ByteBuffer. - * @param bufferSize The buffering size. - * - * @return An Input instance of ByteBuffer, that is backed by an Visitor to an OutputStream. - */ - public static Input<ByteBuffer, IOException> output( final Visitor<OutputStream, IOException> outputVisitor, - final int bufferSize - ) - // END SNIPPET: method - { - return new Input<ByteBuffer, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<ByteBuffer, IOException>() - { - @Override - @SuppressWarnings( "unchecked" ) - public <Receiver2ThrowableType extends Throwable> void sendTo( final Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) - throws Receiver2ThrowableType, IOException - { - try (OutputStream out = new BufferedOutputStream( new OutputStream() - { - @Override - public void write( int b ) - throws IOException - { - // Ignore - } - - @SuppressWarnings( "NullableProblems" ) - @Override - public void write( byte[] b, int off, int len ) - throws IOException - { - try - { - ByteBuffer byteBuffer = ByteBuffer.wrap( b, 0, len ); - receiver.receive( byteBuffer ); - } - catch( Throwable ex ) - { - throw new IOException( ex ); - } - } - }, bufferSize )) - { - outputVisitor.visit( out ); - } - catch( IOException ex ) - { - throw (Receiver2ThrowableType) ex.getCause(); - } - } - } ); - } - }; - } - - private Inputs() - { - } -}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/main/java/org/qi4j/io/Output.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Output.java b/core/io/src/main/java/org/qi4j/io/Output.java deleted file mode 100644 index 3dcd207..0000000 --- a/core/io/src/main/java/org/qi4j/io/Output.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.qi4j.io; - -/** - * Output for data. - */ -// START SNIPPET: output -public interface Output<T, ReceiverThrowableType extends Throwable> -{ -// END SNIPPET: output - - /** - * This initiates a transfer from an Input. Implementations should open any resources to be written to - * and then call sender.sendTo() when it is ready to receive data. When sendTo() returns the resource should be - * closed properly. Make sure to handle any exceptions from sendTo. - * - * @param sender the sender of data to this output - * @param <SenderThrowableType> the exception that sendTo can throw - * - * @throws SenderThrowableType the exception that the sender can throw - * @throws ReceiverThrowableType the exception that this output can throw from receiveItem() - */ -// START SNIPPET: output - <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) - throws ReceiverThrowableType, SenderThrowableType; -} -// END SNIPPET: output http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/main/java/org/qi4j/io/Outputs.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Outputs.java b/core/io/src/main/java/org/qi4j/io/Outputs.java deleted file mode 100644 index 2508788..0000000 --- a/core/io/src/main/java/org/qi4j/io/Outputs.java +++ /dev/null @@ -1,528 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.qi4j.io; - -import java.io.BufferedOutputStream; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.Collection; -import java.util.zip.GZIPOutputStream; - -/** - * Utility methods for creating standard Outputs - */ -public class Outputs -{ - // START SNIPPET: method - - /** - * Write lines to a text file with UTF-8 encoding. Separate each line with a newline ("\n" character). If the writing or sending fails, - * the file is deleted. - * <p> - * If the filename ends with .gz, then the data is automatically GZipped. - * </p> - * @param file the file to save the text to - * - * @return an Output for storing text in a file - */ - public static Output<String, IOException> text( final File file ) - // END SNIPPET: method - { - return text( file, "UTF-8" ); - } - - // START SNIPPET: method - - /** - * Write lines to a text file. Separate each line with a newline ("\n" character). If the writing or sending fails, - * the file is deleted. - * <p> - * If the filename ends with .gz, then the data is automatically GZipped. - * </p> - * @param file the file to save the text to - * - * @return an Output for storing text in a file - */ - public static Output<String, IOException> text( final File file, final String encoding ) - // END SNIPPET: method - { - return new Output<String, IOException>() - { - @Override - @SuppressWarnings( "unchecked" ) - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - File tmpFile = Files.createTemporayFileOf( file ); - - OutputStream stream = new FileOutputStream( tmpFile ); - - // If file should be gzipped, do that automatically - if( file.getName().endsWith( ".gz" ) ) - { - stream = new GZIPOutputStream( stream ); - } - - final BufferedWriter writer = new BufferedWriter( new OutputStreamWriter( stream, encoding ) ); - - try - { - sender.sendTo( new Receiver<String, IOException>() - { - @Override - public void receive( String item ) - throws IOException - { - writer.append( item ).append( '\n' ); - } - } ); - writer.close(); - - // Replace file with temporary file - if( !file.exists() || file.delete() ) - { - if( ! tmpFile.renameTo( file ) ) - { - // TODO: What?? Throw an Exception? - System.err.println( "Unable to rename file: " + tmpFile + " to " + file ); - } - } - } - catch( IOException e ) - { - // We failed writing - close and delete - writer.close(); - if( ! tmpFile.delete() ) - { - System.err.println("Unable to delete temporary file." ); - tmpFile.deleteOnExit(); - } - } - catch( Throwable senderThrowableType ) - { - // We failed writing - close and delete - writer.close(); - if( ! tmpFile.delete() ) - { - System.err.println("Unable to delete temporary file." ); - tmpFile.deleteOnExit(); - } - throw (SenderThrowableType) senderThrowableType; - } - } - }; - } - - // START SNIPPET: method - - /** - * Write lines to a Writer. Separate each line with a newline ("\n" character). - * - * @param writer the Writer to write the text to - * @return an Output for storing text in a Writer - */ - public static Output<String, IOException> text( final Writer writer ) - // END SNIPPET: method - { - return new Output<String, IOException>() - { - - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - sender.sendTo( new Receiver<String, IOException>() - { - - @Override - public void receive( String item ) - throws IOException - { - writer.append( item ).append( "\n" ); - } - - } ); - } - - }; - } - - // START SNIPPET: method - - /** - * Write lines to a StringBuilder. Separate each line with a newline ("\n" character). - * - * @param builder the StringBuilder to append the text to - * @return an Output for storing text in a StringBuilder - */ - public static Output<String, IOException> text( final StringBuilder builder ) - // END SNIPPET: method - { - return new Output<String, IOException>() - { - - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - sender.sendTo( new Receiver<String, IOException>() - { - - @Override - public void receive( String item ) - throws IOException - { - builder.append( item ).append( "\n" ); - } - - } ); - } - - }; - } - - // START SNIPPET: method - - /** - * Write ByteBuffer data to a file. If the writing or sending of data fails the file will be deleted. - * - * @param file The destination file. - * - * @return The Output ByteBuffer instance backed by a File. - */ - public static Output<ByteBuffer, IOException> byteBuffer( final File file ) - // END SNIPPET: method - { - return new Output<ByteBuffer, IOException>() - { - @Override - @SuppressWarnings( "unchecked" ) - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - File tmpFile = Files.createTemporayFileOf( file ); - FileOutputStream stream = new FileOutputStream( tmpFile ); - final FileChannel fco = stream.getChannel(); - - try - { - sender.sendTo( new Receiver<ByteBuffer, IOException>() - { - @Override - public void receive( ByteBuffer item ) - throws IOException - { - fco.write( item ); - } - } ); - stream.close(); - - // Replace file with temporary file - if( !file.exists() || file.delete() ) - { - if( ! tmpFile.renameTo( file ) ) - { - // TODO: What can be done in this case? - System.err.println( "Unable to rename file: " + tmpFile + " to " + file ); - } - } - } - catch( IOException e ) - { - // We failed writing - close and delete - stream.close(); - if( ! tmpFile.delete() ) - { - System.err.println("Unable to delete temporary file." ); - tmpFile.deleteOnExit(); - } - - } - catch( Throwable senderThrowableType ) - { - // We failed writing - close and delete - stream.close(); - if( ! tmpFile.delete() ) - { - System.err.println("Unable to delete temporary file." ); - tmpFile.deleteOnExit(); - } - throw (SenderThrowableType) senderThrowableType; - } - } - }; - } - - // START SNIPPET: method - - /** - * Write ByteBuffer data to an OutputStream. - * - * @param stream Destination OutputStream - * - * @return The Output of ByteBuffer that will be backed by the OutputStream. - */ - public static Output<ByteBuffer, IOException> byteBuffer( final OutputStream stream ) - // END SNIPPET: method - { - return new Output<ByteBuffer, IOException>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - try - { - sender.sendTo( new Receiver<ByteBuffer, IOException>() - { - @Override - public void receive( ByteBuffer item ) - throws IOException - { - if( item.hasArray() ) - { - stream.write( item.array(), item.arrayOffset(), item.limit() ); - } - else - { - for( int i = 0; i < item.limit(); i++ ) - { - stream.write( item.get( i ) ); - } - } - } - } ); - } - finally - { - stream.close(); - } - } - }; - } - - // START SNIPPET: method - - /** - * Write byte array data to a file. If the writing or sending of data fails the file will be deleted. - * - * @param file The File to be written to. - * @param bufferSize The size of the ByteBuffer. - * - * @return An Output instance that will write to the given File. - */ - public static Output<byte[], IOException> bytes( final File file, final int bufferSize ) - // END SNIPPET: method - { - return new Output<byte[], IOException>() - { - @Override - @SuppressWarnings( "unchecked" ) - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends byte[], SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - File tmpFile = Files.createTemporayFileOf( file ); - final OutputStream stream = new BufferedOutputStream( new FileOutputStream( tmpFile ), bufferSize ); - - try - { - sender.sendTo( new Receiver<byte[], IOException>() - { - @Override - public void receive( byte[] item ) - throws IOException - { - stream.write( item ); - } - } ); - stream.close(); - - // Replace file with temporary file - if( !file.exists() || file.delete() ) - { - if( ! tmpFile.renameTo( file ) ) - { - // TODO: WHAT??? - System.err.println( "Unable to rename " + tmpFile + " to " + file ); - } - } - } - catch( IOException e ) - { - // We failed writing - close and delete - stream.close(); - if( ! tmpFile.delete() ) - { - System.err.println("Unable to delete temporary file." ); - tmpFile.deleteOnExit(); - } - } - catch( Throwable senderThrowableType ) - { - // We failed writing - close and delete - stream.close(); - if( ! tmpFile.delete() ) - { - System.err.println("Unable to delete temporary file." ); - tmpFile.deleteOnExit(); - } - throw (SenderThrowableType) senderThrowableType; - } - } - }; - } - - // START SNIPPET: method - - /** - * Do nothing. Use this if you have all logic in filters and/or specifications - * - * @param <T> The item type. - * - * @return An Output instance that ignores all data. - */ - public static <T> Output<T, RuntimeException> noop() - // END SNIPPET: method - { - return withReceiver( new Receiver<T, RuntimeException>() - { - @Override - public void receive( T item ) - throws RuntimeException - { - // Do nothing - } - } ); - } - - // START SNIPPET: method - - /** - * Use given receiver as Output. Use this if there is no need to create a "transaction" for each transfer, and no need - * to do batch writes or similar. - * - * @param <T> The item type - * @param receiver receiver for this Output - * - * @return An Output instance backed by a Receiver of items. - */ - public static <T, ReceiverThrowableType extends Throwable> Output<T, ReceiverThrowableType> withReceiver( final Receiver<T, ReceiverThrowableType> receiver ) - // END SNIPPET: method - { - return new Output<T, ReceiverThrowableType>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) - throws ReceiverThrowableType, SenderThrowableType - { - sender.sendTo( receiver ); - } - }; - } - - // START SNIPPET: method - - /** - * Write objects to System.out.println. - * - * @return An Output instance that is backed by System.out - */ - public static Output<Object, RuntimeException> systemOut() - // END SNIPPET: method - { - return new Output<Object, RuntimeException>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender ) - throws RuntimeException, SenderThrowableType - { - sender.sendTo( new Receiver<Object, RuntimeException>() - { - @Override - public void receive( Object item ) - { - System.out.println( item ); - } - } ); - } - }; - } - - // START SNIPPET: method - - /** - * Write objects to System.err.println. - * - * @return An Output instance backed by System.in - */ - @SuppressWarnings( "UnusedDeclaration" ) - public static Output<Object, RuntimeException> systemErr() - // END SNIPPET: method - { - return new Output<Object, RuntimeException>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender ) - throws RuntimeException, SenderThrowableType - { - sender.sendTo( new Receiver<Object, RuntimeException>() - { - @Override - public void receive( Object item ) - { - System.err.println( item ); - } - } ); - } - }; - } - - // START SNIPPET: method - - /** - * Add items to a collection - */ - public static <T> Output<T, RuntimeException> collection( final Collection<T> collection ) - // END SNIPPET: method - { - return new Output<T, RuntimeException>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) - throws RuntimeException, SenderThrowableType - { - sender.sendTo( new Receiver<T, RuntimeException>() - { - @Override - public void receive( T item ) - throws RuntimeException - { - collection.add( item ); - } - } ); - } - }; - } - - private Outputs() - { - } -} http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/main/java/org/qi4j/io/Receiver.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Receiver.java b/core/io/src/main/java/org/qi4j/io/Receiver.java deleted file mode 100644 index 6318cdf..0000000 --- a/core/io/src/main/java/org/qi4j/io/Receiver.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.qi4j.io; - -/** - * Receiver of items during a specific transfer from an Input to an Output. - */ -// START SNIPPET: receiver -public interface Receiver<T, ReceiverThrowableType extends Throwable> -{ -// END SNIPPET: receiver - /** - * Receive a single item of the given type. The receiver should process it - * and optionally throw an exception if it fails. - * - * @param item - * - * @throws ReceiverThrowableType - */ -// START SNIPPET: receiver - void receive( T item ) - throws ReceiverThrowableType; -} -// END SNIPPET: receiver http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/main/java/org/qi4j/io/Sender.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Sender.java b/core/io/src/main/java/org/qi4j/io/Sender.java deleted file mode 100644 index 05ac007..0000000 --- a/core/io/src/main/java/org/qi4j/io/Sender.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.qi4j.io; - -/** - * Sender of items for a particular transfer from an Input to an Output - */ -// START SNIPPET: sender -public interface Sender<T, SenderThrowableType extends Throwable> -{ -// END SNIPPET: sender - /** - * The sender should send all items it holds to the receiver by invoking receiveItem for each item. - * - * If the receive fails it should properly close any open resources. - * - * @param receiver - * @param <ReceiverThrowableType> - * - * @throws ReceiverThrowableType - * @throws SenderThrowableType - */ -// START SNIPPET: sender - <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super T, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, SenderThrowableType; -} -// END SNIPPET: sender http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/main/java/org/qi4j/io/Transforms.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Transforms.java b/core/io/src/main/java/org/qi4j/io/Transforms.java deleted file mode 100644 index a5d0040..0000000 --- a/core/io/src/main/java/org/qi4j/io/Transforms.java +++ /dev/null @@ -1,435 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.qi4j.io; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.text.MessageFormat; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.logging.Logger; -import org.qi4j.functional.Function; -import org.qi4j.functional.Specification; - -/** - * Utility class for I/O transforms - */ -public class Transforms -{ - /** - * Filter items in a transfer by applying the given Specification to each item. - * - * @param specification The Specification defining the items to not filter away. - * @param output The Output instance to receive to result. - * @param <T> The item type - * @param <Receiver2ThrowableType> Exception type that might be thrown by the Receiver. - * - * @return And Output encapsulation the filter operation. - */ - public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filter( final Specification<? super T> specification, - final Output<T, Receiver2ThrowableType> output - ) - { - return new Output<T, Receiver2ThrowableType>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender ) - throws Receiver2ThrowableType, SenderThrowableType - { - output.receiveFrom( new Sender<T, SenderThrowableType>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, SenderThrowableType - { - sender.sendTo( new Receiver<T, ReceiverThrowableType>() - { - @Override - public void receive( T item ) - throws ReceiverThrowableType - { - if( specification.satisfiedBy( item ) ) - { - receiver.receive( item ); - } - } - } ); - } - } ); - } - }; - } - - /** - * Map items in a transfer from one type to another by applying the given function. - * - * @param function The transformation function to apply to the streaming items. - * @param output The output to receive the transformed items. - * @param <From> The type of the incoming items. - * @param <To> The type of the transformed items. - * @param <Receiver2ThrowableType> The exception type that the Receiver might throw. - * - * @return An Output instance that encapsulates the map transformation. - */ - public static <From, To, Receiver2ThrowableType extends Throwable> Output<From, Receiver2ThrowableType> map( final Function<? super From, ? extends To> function, - final Output<To, Receiver2ThrowableType> output - ) - { - return new Output<From, Receiver2ThrowableType>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends From, SenderThrowableType> sender ) - throws Receiver2ThrowableType, SenderThrowableType - { - output.receiveFrom( new Sender<To, SenderThrowableType>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super To, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, SenderThrowableType - { - sender.sendTo( new Receiver<From, ReceiverThrowableType>() - { - @Override - public void receive( From item ) - throws ReceiverThrowableType - { - receiver.receive( function.map( item ) ); - } - } ); - } - } ); - } - }; - } - - /** - * Apply the given function to items in the transfer that match the given specification. Other items will pass - * through directly. - * - * @param specification The Specification defining which items should be transformed. - * @param function The transformation function. - * @param output The Output that will receive the resulting items. - * @param <T> The item type. Items can not be transformed to a new type. - * @param <Receiver2ThrowableType> The exception that the Receiver might throw. - * - * @return An Output instance that encapsulates the operation. - */ - public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filteredMap( final Specification<? super T> specification, - final Function<? super T, ? extends T> function, - final Output<T, Receiver2ThrowableType> output - ) - { - return new Output<T, Receiver2ThrowableType>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender ) - throws Receiver2ThrowableType, SenderThrowableType - { - output.receiveFrom( new Sender<T, SenderThrowableType>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, SenderThrowableType - { - sender.sendTo( new Receiver<T, ReceiverThrowableType>() - { - @Override - public void receive( T item ) - throws ReceiverThrowableType - { - if( specification.satisfiedBy( item ) ) - { - receiver.receive( function.map( item ) ); - } - else - { - receiver.receive( item ); - } - } - } ); - } - } ); - } - }; - } - - /** - * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on - * the sending side and a write-lock would be used on the receiving side. Inputs can use this as well to create a - * wrapper on the send side when transferTo is invoked. - * - * @param lock the lock to be used for transfers - * @param output output to be wrapped - * @param <T> The Item type - * @param <Receiver2ThrowableType> The Exception type that the Receiver might throw. - * - * @return Output wrapper that uses the given lock during transfers. - */ - public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> lock( final Lock lock, - final Output<T, Receiver2ThrowableType> output - ) - { - return new Output<T, Receiver2ThrowableType>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) - throws Receiver2ThrowableType, SenderThrowableType - { - /** - * Fix for this bug: - * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370 - */ - while( true ) - { - try - { - //noinspection StatementWithEmptyBody - while( !lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) - { - // On timeout, try again - } - break; // Finally got a lock - } - catch( InterruptedException e ) - { - // Try again - } - } - - try - { - output.receiveFrom( sender ); - } - finally - { - lock.unlock(); - } - } - }; - } - - /** - * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on the sending side and a write-lock - * would be used on the receiving side. - * - * @param lock the lock to be used for transfers - * @param input input to be wrapped - * @param <T> The item type. - * @param <SenderThrowableType> The Exception type that the Sender might throw. - * - * @return Input wrapper that uses the given lock during transfers. - */ - public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> lock( final Lock lock, - final Input<T, SenderThrowableType> input - ) - { - return new Input<T, SenderThrowableType>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) - throws SenderThrowableType, ReceiverThrowableType - { - /** - * Fix for this bug: - * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370 - */ - while( true ) - { - try - { - //noinspection StatementWithEmptyBody - while( !( lock.tryLock() || lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) ) - { - // On timeout, try again - } - break; // Finally got a lock - } - catch( InterruptedException e ) - { - // Try again - } - } - - try - { - input.transferTo( output ); - } - finally - { - lock.unlock(); - } - } - }; - } - - /** - * Count the number of items in the transfer. - * - * @param <T> - */ - // START SNIPPET: counter - public static class Counter<T> - implements Function<T, T> - { - private volatile long count = 0; - - public long count() - { - return count; - } - - @Override - public T map( T t ) - { - count++; - return t; - } - } - // END SNIPPET: counter - - /** - * Convert strings to bytes using the given CharSet - */ - @SuppressWarnings( "UnusedDeclaration" ) - public static class String2Bytes - implements Function<String, byte[]> - { - private Charset charSet; - - public String2Bytes( Charset charSet ) - { - this.charSet = charSet; - } - - @Override - public byte[] map( String s ) - { - return s.getBytes( charSet ); - } - } - - /** - * Convert ByteBuffers to Strings using the given CharSet - */ - public static class ByteBuffer2String - implements Function<ByteBuffer, String> - { - private Charset charSet; - - public ByteBuffer2String( Charset charSet ) - { - this.charSet = charSet; - } - - @Override - public String map( ByteBuffer buffer ) - { - return new String( buffer.array(), charSet ); - } - } - - /** - * Convert objects to Strings using .toString() - */ - @SuppressWarnings( "UnusedDeclaration" ) - public static class ObjectToString - implements Function<Object, String> - { - @Override - public String map( Object o ) - { - return o.toString(); - } - } - - /** - * Log the toString() representation of transferred items to the given log. The string is first formatted using MessageFormat - * with the given format. - * - * @param <T> - */ - public static class Log<T> - implements Function<T, T> - { - private Logger logger; - private MessageFormat format; - - public Log( Logger logger, String format ) - { - this.logger = logger; - this.format = new MessageFormat( format ); - } - - @Override - public T map( T item ) - { - logger.info( format.format( new String[]{ item.toString() } ) ); - return item; - } - } - - /** - * Track progress of transfer by emitting a log message in given intervals. - * - * If logger or format is null, then you need to override the logProgress to do something - * - * @param <T> type of items to be transferred - */ - // START SNIPPET: progress - public static class ProgressLog<T> - implements Function<T, T> - { - private Counter<T> counter; - private Log<String> log; - private final long interval; - - public ProgressLog( Logger logger, String format, long interval ) - { - this.interval = interval; - if( logger != null && format != null ) - { - log = new Log<>( logger, format ); - } - counter = new Counter<>(); - } - - public ProgressLog( long interval ) - { - this.interval = interval; - counter = new Counter<>(); - } - - @Override - public T map( T t ) - { - counter.map( t ); - if( counter.count % interval == 0 ) - { - logProgress(); - } - return t; - } - - // Override this to do something other than logging the progress - protected void logProgress() - { - if( log != null ) - { - log.map( counter.count + "" ); - } - } - } - // END SNIPPET: progress -} http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/main/java/org/qi4j/io/package.html ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/package.html b/core/io/src/main/java/org/qi4j/io/package.html deleted file mode 100644 index aac8a54..0000000 --- a/core/io/src/main/java/org/qi4j/io/package.html +++ /dev/null @@ -1,21 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -<html> - <body> - <h2>I/O API.</h2> - </body> -</html> http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java ---------------------------------------------------------------------- diff --git a/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java b/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java new file mode 100644 index 0000000..bc40f2e --- /dev/null +++ b/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java @@ -0,0 +1,381 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.zest.io; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.Writer; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.apache.zest.functional.Function; +import org.apache.zest.functional.Visitor; + +import static java.util.Arrays.asList; +import static org.apache.zest.functional.Iterables.iterable; +import static org.apache.zest.io.Inputs.text; +import static org.apache.zest.io.Transforms.lock; +import static org.apache.zest.test.util.Assume.assumeConnectivity; + +/** + * Test Input/Output. + */ +public class InputOutputTest +{ + @Test + public void testCopyFileNoAPI() + throws IOException + { + File source = sourceFile(); + File destination = File.createTempFile( "test", ".txt" ); + destination.deleteOnExit(); + + BufferedReader reader = new BufferedReader( new FileReader( source ) ); + long count = 0; + try + { + BufferedWriter writer = new BufferedWriter( new FileWriter( destination ) ); + try + { + String line; + while( ( line = reader.readLine() ) != null ) + { + count++; + writer.append( line ).append( '\n' ); + } + writer.close(); + } + catch( IOException e ) + { + writer.close(); + destination.delete(); + } + } + finally + { + reader.close(); + } + System.out.println( count ); + } + + @Test + public void testInputOutput() + throws IOException + { + URL source = getClass().getResource( "/iotest.txt" ); + File destination = File.createTempFile( "test", ".txt" ); + destination.deleteOnExit(); + text( source ).transferTo( Outputs.text( destination ) ); + } + + @Test + public void testCopyFile() + throws IOException + { + File source = sourceFile(); + File tempFile = File.createTempFile( "test", ".txt" ); + tempFile.deleteOnExit(); + + Inputs.byteBuffer( source, 1024 ).transferTo( Outputs.byteBuffer( tempFile ) ); + + Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( source.length() ) ); + } + + @Test + public void testCopyURL() + throws IOException + { + assumeConnectivity( "www.google.com", 80 ); + + File tempFile = File.createTempFile( "test", ".txt" ); + tempFile.deleteOnExit(); + + Inputs.text( new URL( "http://www.google.com" ) ).transferTo( Outputs.text( tempFile ) ); + +// Uncomment to check output Inputs.text( tempFile ).transferTo( Outputs.systemOut() ); + } + + @Test + public void testCopyFileStreams() + throws IOException + { + File source = sourceFile(); + File tempFile = File.createTempFile( "test", ".txt" ); + tempFile.deleteOnExit(); + + Inputs.byteBuffer( new FileInputStream( source ), 1024 ).transferTo( + Outputs.byteBuffer( new FileOutputStream( tempFile ) ) ); + + Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( source.length() ) ); + } + + @Test + public void testLog() + throws IOException + { + File source = sourceFile(); + + text( source ).transferTo( + Transforms.map( new Transforms.Log<String>( Logger.getLogger( getClass().getName() ), "Line: {0}" ), + Outputs.<String>noop() ) ); + } + + @Test + public void testProgressLog() + throws Throwable + { + Integer[] data = new Integer[ 105 ]; + Arrays.fill( data, 42 ); + + Inputs.iterable( iterable( data ) ).transferTo( + Transforms.map( + new Transforms.ProgressLog<Integer>( + Logger.getLogger( InputOutputTest.class.getName() ), "Data transferred: {0}", 10 ), + Outputs.<Integer>noop() ) ); + } + + @Test + public void testTextInputsOutputs() + throws IOException + { + File tempFile = File.createTempFile( "test", ".txt" ); + tempFile.deleteOnExit(); + File sourceFile = sourceFile(); + Transforms.Counter<String> stringCounter = new Transforms.Counter<>(); + text( sourceFile ).transferTo( + Transforms.map( + stringCounter, + Transforms.map( new Function<String, String>() + { + public String map( String s ) + { + System.out.println( s ); + return s; + } + }, Outputs.text( tempFile ) ) + ) + ); + + Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( sourceFile.length() ) ); + Assert.assertThat( stringCounter.count(), CoreMatchers.equalTo( 4L ) ); + } + + @Test + public void testCombineInputs() + throws IOException + { + File tempFile = File.createTempFile( "test", ".txt" ); + tempFile.deleteOnExit(); + File sourceFile = sourceFile(); + Transforms.Counter<String> stringCounter = new Transforms.Counter<>(); + Input<String, IOException> text1 = text( sourceFile ); + Input<String, IOException> text2 = text( sourceFile ); + List<Input<String, IOException>> list = createList( text1, text2 ); + Inputs.combine( list ).transferTo( + Transforms.map( + stringCounter, + Transforms.map( new Function<String, String>() + { + public String map( String s ) + { + System.out.println( s ); + return s; + } + }, Outputs.text( tempFile ) ) + ) + ); + + Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( sourceFile.length() * 2 ) ); + Assert.assertThat( stringCounter.count(), CoreMatchers.equalTo( 8L ) ); + } + + @SuppressWarnings( "unchecked" ) + private List<Input<String, IOException>> createList( Input<String, IOException> text1, + Input<String, IOException> text2 + ) + { + return asList( text1, text2 ); + } + + @Test( expected = IOException.class ) + public void testInputOutputOutputException() + throws IOException + { + + text( sourceFile() ). + transferTo( writerOutput( new Writer() + { + @Override + public void write( char[] cbuf, int off, int len ) + throws IOException + { + throw new IOException(); + } + + @Override + public void flush() + throws IOException + { + throw new IOException(); + } + + @Override + public void close() + throws IOException + { + throw new IOException(); + } + } ) ); + } + + @Test( expected = RemoteException.class ) + public void testInputOutputInputException() + throws IOException + { + + Input<String, RemoteException> input = new Input<String, RemoteException>() + { + @Override + public <OutputThrowableType extends Throwable> void transferTo( Output<? super String, OutputThrowableType> output ) + throws RemoteException, OutputThrowableType + { + output.receiveFrom( new Sender<String, RemoteException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiverThrowableTypeReceiver ) + throws ReceiverThrowableType, RemoteException + { + throw new RemoteException(); + } + } ); + } + }; + + input.transferTo( + Transforms.map( + new Transforms.Log<String>( Logger.getLogger( getClass().getName() ), "Line: {0}" ), + Outputs.systemOut() + ) + ); + } + + @Test + public void testLock() + throws IOException + { + Lock inputLock = new ReentrantLock(); + Lock outputLock = new ReentrantLock(); + + URL source = getClass().getResource( "/iotest.txt" ); + File destination = File.createTempFile( "test", ".txt" ); + destination.deleteOnExit(); + lock( inputLock, text( source ) ).transferTo( lock( outputLock, Outputs.text( destination ) ) ); + } + + @Test + public void testGenerics() + { + ArrayList<Object> objects = new ArrayList<>( 3 ); + Inputs.iterable( Arrays.asList( "Foo", "Bar", "Xyzzy" ) ).transferTo( Outputs.collection( objects ) ); + + Inputs.iterable( objects ).transferTo( Outputs.systemOut() ); + } + + @Test + public void testOutputstreamInput() + throws Throwable + { + Input<ByteBuffer, IOException> input = Inputs.output( new Visitor<OutputStream, IOException>() + { + @Override + public boolean visit( OutputStream visited ) + throws IOException + { + try( PrintWriter writer = new PrintWriter( visited ) ) + { + writer.print( "Hello World!" ); + } + return true; + } + }, 256 ); + + input.transferTo( Transforms.map( new Transforms.ByteBuffer2String( Charset.defaultCharset() ), Outputs.systemOut() ) ); + input.transferTo( Transforms.map( new Transforms.ByteBuffer2String( Charset.defaultCharset() ), Outputs.systemOut() ) ); + } + + public Output<String, IOException> writerOutput( final Writer writer ) + { + return new Output<String, IOException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + // Here we initiate the transfer + System.out.println( "Open output" ); + final StringBuilder builder = new StringBuilder(); + try + { + sender.sendTo( new Receiver<String, IOException>() + { + @Override + public void receive( String item ) + throws IOException + { + System.out.println( "Receive input" ); + + // Here we can do batch writes if needed + builder.append( item ).append( "\n" ); + } + } ); + + // If transfer went well, do something with it + writer.write( builder.toString() ); + writer.flush(); + System.out.println( "Output written" ); + } + catch( IOException e ) + { + // If transfer failed, potentially rollback writes + System.out.println( "Input failed" ); + throw e; + } + } + }; + } + + private File sourceFile() + { + String path = getClass().getResource( "/iotest.txt" ).getFile(); + return new File( path.replaceAll( "%20", " " ) ); + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/061ddaa0/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java ---------------------------------------------------------------------- diff --git a/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java b/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java new file mode 100644 index 0000000..9ff74a2 --- /dev/null +++ b/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.zest.io.docsupport; + +import java.io.File; +import java.io.IOException; +import org.apache.zest.io.Inputs; +import org.apache.zest.io.Outputs; + +// START SNIPPET: io2 +import org.apache.zest.io.Transforms.Counter; +import static org.apache.zest.io.Transforms.map; +// END SNIPPET: io2 + +public class IoDocs +{ + public static void main( String[] args ) + throws IOException + { + { +// START SNIPPET: io1 + File source = new File( "source.txt" ); + File destination = new File( "destination.txt" ); + Inputs.text( source ).transferTo( Outputs.text( destination ) ); +// END SNIPPET: io1 + } + { +// START SNIPPET: io2 + File source = new File( "source.txt" ); + File destination = new File( "destination.txt" ); + Counter<String> counter = new Counter<String>(); + Inputs.text( source ).transferTo( map(counter, Outputs.text(destination) )); + System.out.println( "Lines: " + counter.count() ); +// END SNIPPET: io2 + } + } +}
