http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Inputs.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Inputs.java b/core/io/src/main/java/org/qi4j/io/Inputs.java new file mode 100644 index 0000000..eb2001f --- /dev/null +++ b/core/io/src/main/java/org/qi4j/io/Inputs.java @@ -0,0 +1,490 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.qi4j.io; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.net.URL; +import java.net.URLConnection; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Scanner; +import java.util.zip.GZIPInputStream; +import org.qi4j.functional.Visitor; + +/** + * Common inputs + */ +public class Inputs +{ + // START SNIPPET: method + + /** + * Read lines from a String. + * + * @param source lines + * + * @return Input that provides lines from the string as strings + */ + public static Input<String, RuntimeException> text( final String source ) + // END SNIPPET: method + { + return new Input<String, RuntimeException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws RuntimeException, ReceiverThrowableType + { + + output.receiveFrom( new Sender<String, RuntimeException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, RuntimeException + { + Scanner scanner = new Scanner( source ); + while( scanner.hasNextLine() ) + { + receiver.receive( scanner.nextLine() ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Read lines from a Reader. + * + * @param source lines + * + * @return Input that provides lines from the string as strings + */ + public static Input<String, RuntimeException> text( final Reader source ) + // END SNIPPET: method + { + return new Input<String, RuntimeException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws RuntimeException, ReceiverThrowableType + { + + output.receiveFrom( new Sender<String, RuntimeException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, RuntimeException + { + Scanner scanner = new Scanner( source ); + while( scanner.hasNextLine() ) + { + receiver.receive( scanner.nextLine() ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Read lines from a UTF-8 encoded textfile. + * + * If the filename ends with .gz, then the data is automatically unzipped when read. + * + * @param source textfile with lines separated by \n character + * + * @return Input that provides lines from the textfiles as strings + */ + public static Input<String, IOException> text( final File source ) + // END SNIPPET: method + { + return text( source, "UTF-8" ); + } + + // START SNIPPET: method + + /** + * Read lines from a textfile with the given encoding. + * + * If the filename ends with .gz, then the data is automatically unzipped when read. + * + * @param source textfile with lines separated by \n character + * @param encoding encoding of file, e.g. "UTF-8" + * + * @return Input that provides lines from the textfiles as strings + */ + public static Input<String, IOException> text( final File source, final String encoding ) + // END SNIPPET: method + { + return new Input<String, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + InputStream stream = new FileInputStream( source ); + + // If file is gzipped, unzip it automatically + if( source.getName().endsWith( ".gz" ) ) + { + stream = new GZIPInputStream( stream ); + } + + try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, encoding ) )) + { + output.receiveFrom( new Sender<String, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + String line; + while( ( line = reader.readLine() ) != null ) + { + receiver.receive( line ); + } + } + } ); + } + } + }; + } + + // START SNIPPET: method + + /** + * Read lines from a textfile at a given URL. + * + * If the content support gzip encoding, then the data is automatically unzipped when read. + * + * The charset in the content-type of the URL will be used for parsing. Default is UTF-8. + * + * @param source textfile with lines separated by \n character + * + * @return Input that provides lines from the textfiles as strings + */ + public static Input<String, IOException> text( final URL source ) + // END SNIPPET: method + { + return new Input<String, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + URLConnection urlConnection = source.openConnection(); + urlConnection.setRequestProperty( "Accept-Encoding", "gzip" ); + InputStream stream = urlConnection.getInputStream(); + + // If file is gzipped, unzip it automatically + if( "gzip".equals( urlConnection.getContentEncoding() ) ) + { + stream = new GZIPInputStream( stream ); + } + + // Figure out charset given content-type + String contentType = urlConnection.getContentType(); + String charSet = "UTF-8"; + if( contentType.contains( "charset=" ) ) + { + charSet = contentType.substring( contentType.indexOf( "charset=" ) + "charset=".length() ); + } + + try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, charSet ) )) + { + output.receiveFrom( new Sender<String, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + String line; + while( ( line = reader.readLine() ) != null ) + { + receiver.receive( line ); + } + } + } ); + } + } + }; + } + + // START SNIPPET: method + + /** + * Read a file using ByteBuffer of a given size. Useful for transferring raw data. + * + * @param source The file to be read. + * @param bufferSize The size of the byte array. + * + * @return An Input instance to be applied to streaming operations. + */ + public static Input<ByteBuffer, IOException> byteBuffer( final File source, final int bufferSize ) + // END SNIPPET: method + { + return new Input<ByteBuffer, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + final FileInputStream stream = new FileInputStream( source ); + final FileChannel fci = stream.getChannel(); + + final ByteBuffer buffer = ByteBuffer.allocate( bufferSize ); + + try + { + output.receiveFrom( new Sender<ByteBuffer, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + while( fci.read( buffer ) != -1 ) + { + buffer.flip(); + receiver.receive( buffer ); + buffer.clear(); + } + } + } ); + } + finally + { + stream.close(); + } + } + }; + } + + // START SNIPPET: method + + /** + * Read an inputstream using ByteBuffer of a given size. + * + * @param source The InputStream to be read. + * @param bufferSize The size of the byte array. + * + * @return An Input instance to be applied to streaming operations. + */ + public static Input<ByteBuffer, IOException> byteBuffer( final InputStream source, final int bufferSize ) + // END SNIPPET: method + { + return new Input<ByteBuffer, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + try + { + output.receiveFrom( new Sender<ByteBuffer, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + byte[] buffer = new byte[ bufferSize ]; + + int len; + while( ( len = source.read( buffer ) ) != -1 ) + { + ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, 0, len ); + receiver.receive( byteBuffer ); + } + } + } ); + } + finally + { + source.close(); + } + } + }; + } + + // START SNIPPET: method + + /** + * Combine many Input into one single Input. When a transfer is initiated from it all items from all inputs will be transferred + * to the given Output. + * + * @param inputs An Iterable of Input instances to be combined. + * @param <T> The item type of the Input + * @param <SenderThrowableType> The Throwable that might be thrown by the Inputs. + * + * @return A combined Input, allowing for easy aggregation of many Input sources. + */ + public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> combine( final Iterable<Input<T, SenderThrowableType>> inputs ) + // END SNIPPET: method + { + return new Input<T, SenderThrowableType>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void transferTo( Output<? super T, Receiver2ThrowableType> output ) + throws SenderThrowableType, Receiver2ThrowableType + { + output.receiveFrom( new Sender<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + for( Input<T, SenderThrowableType> input : inputs ) + { + input.transferTo( new Output<T, ReceiverThrowableType>() + { + @Override + public <Sender2ThrowableType extends Throwable> void receiveFrom( Sender<? extends T, Sender2ThrowableType> sender ) + throws ReceiverThrowableType, Sender2ThrowableType + { + sender.sendTo( new Receiver<T, ReceiverThrowableType>() + { + @Override + public void receive( T item ) + throws ReceiverThrowableType + { + receiver.receive( item ); + } + } ); + } + } ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Create an Input that takes its items from the given Iterable. + * + * @param iterable The Iterable to be used as an Input. + * @param <T> The item type of the Input + * + * @return An Input instance that is backed by the Iterable. + */ + public static <T> Input<T, RuntimeException> iterable( final Iterable<T> iterable ) + // END SNIPPET: method + { + return new Input<T, RuntimeException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) + throws RuntimeException, ReceiverThrowableType + { + output.receiveFrom( new Sender<T, RuntimeException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super T, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, RuntimeException + { + for( T item : iterable ) + { + receiver.receive( item ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Create an Input that allows a Visitor to write to an OutputStream. The stream is a BufferedOutputStream, so when enough + * data has been gathered it will send this in chunks of the given size to the Output it is transferred to. The Visitor does not have to call + * close() on the OutputStream, but should ensure that any wrapper streams or writers are flushed so that all data is sent. + * + * @param outputVisitor The OutputStream Visitor that will be backing the Input ByteBuffer. + * @param bufferSize The buffering size. + * + * @return An Input instance of ByteBuffer, that is backed by an Visitor to an OutputStream. + */ + public static Input<ByteBuffer, IOException> output( final Visitor<OutputStream, IOException> outputVisitor, + final int bufferSize + ) + // END SNIPPET: method + { + return new Input<ByteBuffer, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + output.receiveFrom( new Sender<ByteBuffer, IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <Receiver2ThrowableType extends Throwable> void sendTo( final Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + try (OutputStream out = new BufferedOutputStream( new OutputStream() + { + @Override + public void write( int b ) + throws IOException + { + // Ignore + } + + @SuppressWarnings( "NullableProblems" ) + @Override + public void write( byte[] b, int off, int len ) + throws IOException + { + try + { + ByteBuffer byteBuffer = ByteBuffer.wrap( b, 0, len ); + receiver.receive( byteBuffer ); + } + catch( Throwable ex ) + { + throw new IOException( ex ); + } + } + }, bufferSize )) + { + outputVisitor.visit( out ); + } + catch( IOException ex ) + { + throw (Receiver2ThrowableType) ex.getCause(); + } + } + } ); + } + }; + } + + private Inputs() + { + } +}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Output.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Output.java b/core/io/src/main/java/org/qi4j/io/Output.java new file mode 100644 index 0000000..3dcd207 --- /dev/null +++ b/core/io/src/main/java/org/qi4j/io/Output.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.qi4j.io; + +/** + * Output for data. + */ +// START SNIPPET: output +public interface Output<T, ReceiverThrowableType extends Throwable> +{ +// END SNIPPET: output + + /** + * This initiates a transfer from an Input. Implementations should open any resources to be written to + * and then call sender.sendTo() when it is ready to receive data. When sendTo() returns the resource should be + * closed properly. Make sure to handle any exceptions from sendTo. + * + * @param sender the sender of data to this output + * @param <SenderThrowableType> the exception that sendTo can throw + * + * @throws SenderThrowableType the exception that the sender can throw + * @throws ReceiverThrowableType the exception that this output can throw from receiveItem() + */ +// START SNIPPET: output + <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws ReceiverThrowableType, SenderThrowableType; +} +// END SNIPPET: output http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Outputs.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Outputs.java b/core/io/src/main/java/org/qi4j/io/Outputs.java new file mode 100644 index 0000000..2508788 --- /dev/null +++ b/core/io/src/main/java/org/qi4j/io/Outputs.java @@ -0,0 +1,528 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.qi4j.io; + +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Collection; +import java.util.zip.GZIPOutputStream; + +/** + * Utility methods for creating standard Outputs + */ +public class Outputs +{ + // START SNIPPET: method + + /** + * Write lines to a text file with UTF-8 encoding. Separate each line with a newline ("\n" character). If the writing or sending fails, + * the file is deleted. + * <p> + * If the filename ends with .gz, then the data is automatically GZipped. + * </p> + * @param file the file to save the text to + * + * @return an Output for storing text in a file + */ + public static Output<String, IOException> text( final File file ) + // END SNIPPET: method + { + return text( file, "UTF-8" ); + } + + // START SNIPPET: method + + /** + * Write lines to a text file. Separate each line with a newline ("\n" character). If the writing or sending fails, + * the file is deleted. + * <p> + * If the filename ends with .gz, then the data is automatically GZipped. + * </p> + * @param file the file to save the text to + * + * @return an Output for storing text in a file + */ + public static Output<String, IOException> text( final File file, final String encoding ) + // END SNIPPET: method + { + return new Output<String, IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + File tmpFile = Files.createTemporayFileOf( file ); + + OutputStream stream = new FileOutputStream( tmpFile ); + + // If file should be gzipped, do that automatically + if( file.getName().endsWith( ".gz" ) ) + { + stream = new GZIPOutputStream( stream ); + } + + final BufferedWriter writer = new BufferedWriter( new OutputStreamWriter( stream, encoding ) ); + + try + { + sender.sendTo( new Receiver<String, IOException>() + { + @Override + public void receive( String item ) + throws IOException + { + writer.append( item ).append( '\n' ); + } + } ); + writer.close(); + + // Replace file with temporary file + if( !file.exists() || file.delete() ) + { + if( ! tmpFile.renameTo( file ) ) + { + // TODO: What?? Throw an Exception? + System.err.println( "Unable to rename file: " + tmpFile + " to " + file ); + } + } + } + catch( IOException e ) + { + // We failed writing - close and delete + writer.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + } + catch( Throwable senderThrowableType ) + { + // We failed writing - close and delete + writer.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + throw (SenderThrowableType) senderThrowableType; + } + } + }; + } + + // START SNIPPET: method + + /** + * Write lines to a Writer. Separate each line with a newline ("\n" character). + * + * @param writer the Writer to write the text to + * @return an Output for storing text in a Writer + */ + public static Output<String, IOException> text( final Writer writer ) + // END SNIPPET: method + { + return new Output<String, IOException>() + { + + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + sender.sendTo( new Receiver<String, IOException>() + { + + @Override + public void receive( String item ) + throws IOException + { + writer.append( item ).append( "\n" ); + } + + } ); + } + + }; + } + + // START SNIPPET: method + + /** + * Write lines to a StringBuilder. Separate each line with a newline ("\n" character). + * + * @param builder the StringBuilder to append the text to + * @return an Output for storing text in a StringBuilder + */ + public static Output<String, IOException> text( final StringBuilder builder ) + // END SNIPPET: method + { + return new Output<String, IOException>() + { + + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + sender.sendTo( new Receiver<String, IOException>() + { + + @Override + public void receive( String item ) + throws IOException + { + builder.append( item ).append( "\n" ); + } + + } ); + } + + }; + } + + // START SNIPPET: method + + /** + * Write ByteBuffer data to a file. If the writing or sending of data fails the file will be deleted. + * + * @param file The destination file. + * + * @return The Output ByteBuffer instance backed by a File. + */ + public static Output<ByteBuffer, IOException> byteBuffer( final File file ) + // END SNIPPET: method + { + return new Output<ByteBuffer, IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + File tmpFile = Files.createTemporayFileOf( file ); + FileOutputStream stream = new FileOutputStream( tmpFile ); + final FileChannel fco = stream.getChannel(); + + try + { + sender.sendTo( new Receiver<ByteBuffer, IOException>() + { + @Override + public void receive( ByteBuffer item ) + throws IOException + { + fco.write( item ); + } + } ); + stream.close(); + + // Replace file with temporary file + if( !file.exists() || file.delete() ) + { + if( ! tmpFile.renameTo( file ) ) + { + // TODO: What can be done in this case? + System.err.println( "Unable to rename file: " + tmpFile + " to " + file ); + } + } + } + catch( IOException e ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + + } + catch( Throwable senderThrowableType ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + throw (SenderThrowableType) senderThrowableType; + } + } + }; + } + + // START SNIPPET: method + + /** + * Write ByteBuffer data to an OutputStream. + * + * @param stream Destination OutputStream + * + * @return The Output of ByteBuffer that will be backed by the OutputStream. + */ + public static Output<ByteBuffer, IOException> byteBuffer( final OutputStream stream ) + // END SNIPPET: method + { + return new Output<ByteBuffer, IOException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + try + { + sender.sendTo( new Receiver<ByteBuffer, IOException>() + { + @Override + public void receive( ByteBuffer item ) + throws IOException + { + if( item.hasArray() ) + { + stream.write( item.array(), item.arrayOffset(), item.limit() ); + } + else + { + for( int i = 0; i < item.limit(); i++ ) + { + stream.write( item.get( i ) ); + } + } + } + } ); + } + finally + { + stream.close(); + } + } + }; + } + + // START SNIPPET: method + + /** + * Write byte array data to a file. If the writing or sending of data fails the file will be deleted. + * + * @param file The File to be written to. + * @param bufferSize The size of the ByteBuffer. + * + * @return An Output instance that will write to the given File. + */ + public static Output<byte[], IOException> bytes( final File file, final int bufferSize ) + // END SNIPPET: method + { + return new Output<byte[], IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends byte[], SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + File tmpFile = Files.createTemporayFileOf( file ); + final OutputStream stream = new BufferedOutputStream( new FileOutputStream( tmpFile ), bufferSize ); + + try + { + sender.sendTo( new Receiver<byte[], IOException>() + { + @Override + public void receive( byte[] item ) + throws IOException + { + stream.write( item ); + } + } ); + stream.close(); + + // Replace file with temporary file + if( !file.exists() || file.delete() ) + { + if( ! tmpFile.renameTo( file ) ) + { + // TODO: WHAT??? + System.err.println( "Unable to rename " + tmpFile + " to " + file ); + } + } + } + catch( IOException e ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + } + catch( Throwable senderThrowableType ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + throw (SenderThrowableType) senderThrowableType; + } + } + }; + } + + // START SNIPPET: method + + /** + * Do nothing. Use this if you have all logic in filters and/or specifications + * + * @param <T> The item type. + * + * @return An Output instance that ignores all data. + */ + public static <T> Output<T, RuntimeException> noop() + // END SNIPPET: method + { + return withReceiver( new Receiver<T, RuntimeException>() + { + @Override + public void receive( T item ) + throws RuntimeException + { + // Do nothing + } + } ); + } + + // START SNIPPET: method + + /** + * Use given receiver as Output. Use this if there is no need to create a "transaction" for each transfer, and no need + * to do batch writes or similar. + * + * @param <T> The item type + * @param receiver receiver for this Output + * + * @return An Output instance backed by a Receiver of items. + */ + public static <T, ReceiverThrowableType extends Throwable> Output<T, ReceiverThrowableType> withReceiver( final Receiver<T, ReceiverThrowableType> receiver ) + // END SNIPPET: method + { + return new Output<T, ReceiverThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( receiver ); + } + }; + } + + // START SNIPPET: method + + /** + * Write objects to System.out.println. + * + * @return An Output instance that is backed by System.out + */ + public static Output<Object, RuntimeException> systemOut() + // END SNIPPET: method + { + return new Output<Object, RuntimeException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender ) + throws RuntimeException, SenderThrowableType + { + sender.sendTo( new Receiver<Object, RuntimeException>() + { + @Override + public void receive( Object item ) + { + System.out.println( item ); + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Write objects to System.err.println. + * + * @return An Output instance backed by System.in + */ + @SuppressWarnings( "UnusedDeclaration" ) + public static Output<Object, RuntimeException> systemErr() + // END SNIPPET: method + { + return new Output<Object, RuntimeException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender ) + throws RuntimeException, SenderThrowableType + { + sender.sendTo( new Receiver<Object, RuntimeException>() + { + @Override + public void receive( Object item ) + { + System.err.println( item ); + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Add items to a collection + */ + public static <T> Output<T, RuntimeException> collection( final Collection<T> collection ) + // END SNIPPET: method + { + return new Output<T, RuntimeException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws RuntimeException, SenderThrowableType + { + sender.sendTo( new Receiver<T, RuntimeException>() + { + @Override + public void receive( T item ) + throws RuntimeException + { + collection.add( item ); + } + } ); + } + }; + } + + private Outputs() + { + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Receiver.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Receiver.java b/core/io/src/main/java/org/qi4j/io/Receiver.java new file mode 100644 index 0000000..6318cdf --- /dev/null +++ b/core/io/src/main/java/org/qi4j/io/Receiver.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.qi4j.io; + +/** + * Receiver of items during a specific transfer from an Input to an Output. + */ +// START SNIPPET: receiver +public interface Receiver<T, ReceiverThrowableType extends Throwable> +{ +// END SNIPPET: receiver + /** + * Receive a single item of the given type. The receiver should process it + * and optionally throw an exception if it fails. + * + * @param item + * + * @throws ReceiverThrowableType + */ +// START SNIPPET: receiver + void receive( T item ) + throws ReceiverThrowableType; +} +// END SNIPPET: receiver http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Sender.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Sender.java b/core/io/src/main/java/org/qi4j/io/Sender.java new file mode 100644 index 0000000..05ac007 --- /dev/null +++ b/core/io/src/main/java/org/qi4j/io/Sender.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.qi4j.io; + +/** + * Sender of items for a particular transfer from an Input to an Output + */ +// START SNIPPET: sender +public interface Sender<T, SenderThrowableType extends Throwable> +{ +// END SNIPPET: sender + /** + * The sender should send all items it holds to the receiver by invoking receiveItem for each item. + * + * If the receive fails it should properly close any open resources. + * + * @param receiver + * @param <ReceiverThrowableType> + * + * @throws ReceiverThrowableType + * @throws SenderThrowableType + */ +// START SNIPPET: sender + <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType; +} +// END SNIPPET: sender http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/Transforms.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Transforms.java b/core/io/src/main/java/org/qi4j/io/Transforms.java new file mode 100644 index 0000000..a5d0040 --- /dev/null +++ b/core/io/src/main/java/org/qi4j/io/Transforms.java @@ -0,0 +1,435 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.qi4j.io; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.text.MessageFormat; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.logging.Logger; +import org.qi4j.functional.Function; +import org.qi4j.functional.Specification; + +/** + * Utility class for I/O transforms + */ +public class Transforms +{ + /** + * Filter items in a transfer by applying the given Specification to each item. + * + * @param specification The Specification defining the items to not filter away. + * @param output The Output instance to receive to result. + * @param <T> The item type + * @param <Receiver2ThrowableType> Exception type that might be thrown by the Receiver. + * + * @return And Output encapsulation the filter operation. + */ + public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filter( final Specification<? super T> specification, + final Output<T, Receiver2ThrowableType> output + ) + { + return new Output<T, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + output.receiveFrom( new Sender<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( new Receiver<T, ReceiverThrowableType>() + { + @Override + public void receive( T item ) + throws ReceiverThrowableType + { + if( specification.satisfiedBy( item ) ) + { + receiver.receive( item ); + } + } + } ); + } + } ); + } + }; + } + + /** + * Map items in a transfer from one type to another by applying the given function. + * + * @param function The transformation function to apply to the streaming items. + * @param output The output to receive the transformed items. + * @param <From> The type of the incoming items. + * @param <To> The type of the transformed items. + * @param <Receiver2ThrowableType> The exception type that the Receiver might throw. + * + * @return An Output instance that encapsulates the map transformation. + */ + public static <From, To, Receiver2ThrowableType extends Throwable> Output<From, Receiver2ThrowableType> map( final Function<? super From, ? extends To> function, + final Output<To, Receiver2ThrowableType> output + ) + { + return new Output<From, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends From, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + output.receiveFrom( new Sender<To, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super To, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( new Receiver<From, ReceiverThrowableType>() + { + @Override + public void receive( From item ) + throws ReceiverThrowableType + { + receiver.receive( function.map( item ) ); + } + } ); + } + } ); + } + }; + } + + /** + * Apply the given function to items in the transfer that match the given specification. Other items will pass + * through directly. + * + * @param specification The Specification defining which items should be transformed. + * @param function The transformation function. + * @param output The Output that will receive the resulting items. + * @param <T> The item type. Items can not be transformed to a new type. + * @param <Receiver2ThrowableType> The exception that the Receiver might throw. + * + * @return An Output instance that encapsulates the operation. + */ + public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filteredMap( final Specification<? super T> specification, + final Function<? super T, ? extends T> function, + final Output<T, Receiver2ThrowableType> output + ) + { + return new Output<T, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + output.receiveFrom( new Sender<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( new Receiver<T, ReceiverThrowableType>() + { + @Override + public void receive( T item ) + throws ReceiverThrowableType + { + if( specification.satisfiedBy( item ) ) + { + receiver.receive( function.map( item ) ); + } + else + { + receiver.receive( item ); + } + } + } ); + } + } ); + } + }; + } + + /** + * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on + * the sending side and a write-lock would be used on the receiving side. Inputs can use this as well to create a + * wrapper on the send side when transferTo is invoked. + * + * @param lock the lock to be used for transfers + * @param output output to be wrapped + * @param <T> The Item type + * @param <Receiver2ThrowableType> The Exception type that the Receiver might throw. + * + * @return Output wrapper that uses the given lock during transfers. + */ + public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> lock( final Lock lock, + final Output<T, Receiver2ThrowableType> output + ) + { + return new Output<T, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + /** + * Fix for this bug: + * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370 + */ + while( true ) + { + try + { + //noinspection StatementWithEmptyBody + while( !lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) + { + // On timeout, try again + } + break; // Finally got a lock + } + catch( InterruptedException e ) + { + // Try again + } + } + + try + { + output.receiveFrom( sender ); + } + finally + { + lock.unlock(); + } + } + }; + } + + /** + * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on the sending side and a write-lock + * would be used on the receiving side. + * + * @param lock the lock to be used for transfers + * @param input input to be wrapped + * @param <T> The item type. + * @param <SenderThrowableType> The Exception type that the Sender might throw. + * + * @return Input wrapper that uses the given lock during transfers. + */ + public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> lock( final Lock lock, + final Input<T, SenderThrowableType> input + ) + { + return new Input<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) + throws SenderThrowableType, ReceiverThrowableType + { + /** + * Fix for this bug: + * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370 + */ + while( true ) + { + try + { + //noinspection StatementWithEmptyBody + while( !( lock.tryLock() || lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) ) + { + // On timeout, try again + } + break; // Finally got a lock + } + catch( InterruptedException e ) + { + // Try again + } + } + + try + { + input.transferTo( output ); + } + finally + { + lock.unlock(); + } + } + }; + } + + /** + * Count the number of items in the transfer. + * + * @param <T> + */ + // START SNIPPET: counter + public static class Counter<T> + implements Function<T, T> + { + private volatile long count = 0; + + public long count() + { + return count; + } + + @Override + public T map( T t ) + { + count++; + return t; + } + } + // END SNIPPET: counter + + /** + * Convert strings to bytes using the given CharSet + */ + @SuppressWarnings( "UnusedDeclaration" ) + public static class String2Bytes + implements Function<String, byte[]> + { + private Charset charSet; + + public String2Bytes( Charset charSet ) + { + this.charSet = charSet; + } + + @Override + public byte[] map( String s ) + { + return s.getBytes( charSet ); + } + } + + /** + * Convert ByteBuffers to Strings using the given CharSet + */ + public static class ByteBuffer2String + implements Function<ByteBuffer, String> + { + private Charset charSet; + + public ByteBuffer2String( Charset charSet ) + { + this.charSet = charSet; + } + + @Override + public String map( ByteBuffer buffer ) + { + return new String( buffer.array(), charSet ); + } + } + + /** + * Convert objects to Strings using .toString() + */ + @SuppressWarnings( "UnusedDeclaration" ) + public static class ObjectToString + implements Function<Object, String> + { + @Override + public String map( Object o ) + { + return o.toString(); + } + } + + /** + * Log the toString() representation of transferred items to the given log. The string is first formatted using MessageFormat + * with the given format. + * + * @param <T> + */ + public static class Log<T> + implements Function<T, T> + { + private Logger logger; + private MessageFormat format; + + public Log( Logger logger, String format ) + { + this.logger = logger; + this.format = new MessageFormat( format ); + } + + @Override + public T map( T item ) + { + logger.info( format.format( new String[]{ item.toString() } ) ); + return item; + } + } + + /** + * Track progress of transfer by emitting a log message in given intervals. + * + * If logger or format is null, then you need to override the logProgress to do something + * + * @param <T> type of items to be transferred + */ + // START SNIPPET: progress + public static class ProgressLog<T> + implements Function<T, T> + { + private Counter<T> counter; + private Log<String> log; + private final long interval; + + public ProgressLog( Logger logger, String format, long interval ) + { + this.interval = interval; + if( logger != null && format != null ) + { + log = new Log<>( logger, format ); + } + counter = new Counter<>(); + } + + public ProgressLog( long interval ) + { + this.interval = interval; + counter = new Counter<>(); + } + + @Override + public T map( T t ) + { + counter.map( t ); + if( counter.count % interval == 0 ) + { + logProgress(); + } + return t; + } + + // Override this to do something other than logging the progress + protected void logProgress() + { + if( log != null ) + { + log.map( counter.count + "" ); + } + } + } + // END SNIPPET: progress +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/main/java/org/qi4j/io/package.html ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/package.html b/core/io/src/main/java/org/qi4j/io/package.html new file mode 100644 index 0000000..aac8a54 --- /dev/null +++ b/core/io/src/main/java/org/qi4j/io/package.html @@ -0,0 +1,21 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<html> + <body> + <h2>I/O API.</h2> + </body> +</html> http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java ---------------------------------------------------------------------- diff --git a/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java b/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java deleted file mode 100644 index bc40f2e..0000000 --- a/core/io/src/test/java/org/apache/zest/io/InputOutputTest.java +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.zest.io; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.io.Writer; -import java.net.URL; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Logger; -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.apache.zest.functional.Function; -import org.apache.zest.functional.Visitor; - -import static java.util.Arrays.asList; -import static org.apache.zest.functional.Iterables.iterable; -import static org.apache.zest.io.Inputs.text; -import static org.apache.zest.io.Transforms.lock; -import static org.apache.zest.test.util.Assume.assumeConnectivity; - -/** - * Test Input/Output. - */ -public class InputOutputTest -{ - @Test - public void testCopyFileNoAPI() - throws IOException - { - File source = sourceFile(); - File destination = File.createTempFile( "test", ".txt" ); - destination.deleteOnExit(); - - BufferedReader reader = new BufferedReader( new FileReader( source ) ); - long count = 0; - try - { - BufferedWriter writer = new BufferedWriter( new FileWriter( destination ) ); - try - { - String line; - while( ( line = reader.readLine() ) != null ) - { - count++; - writer.append( line ).append( '\n' ); - } - writer.close(); - } - catch( IOException e ) - { - writer.close(); - destination.delete(); - } - } - finally - { - reader.close(); - } - System.out.println( count ); - } - - @Test - public void testInputOutput() - throws IOException - { - URL source = getClass().getResource( "/iotest.txt" ); - File destination = File.createTempFile( "test", ".txt" ); - destination.deleteOnExit(); - text( source ).transferTo( Outputs.text( destination ) ); - } - - @Test - public void testCopyFile() - throws IOException - { - File source = sourceFile(); - File tempFile = File.createTempFile( "test", ".txt" ); - tempFile.deleteOnExit(); - - Inputs.byteBuffer( source, 1024 ).transferTo( Outputs.byteBuffer( tempFile ) ); - - Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( source.length() ) ); - } - - @Test - public void testCopyURL() - throws IOException - { - assumeConnectivity( "www.google.com", 80 ); - - File tempFile = File.createTempFile( "test", ".txt" ); - tempFile.deleteOnExit(); - - Inputs.text( new URL( "http://www.google.com" ) ).transferTo( Outputs.text( tempFile ) ); - -// Uncomment to check output Inputs.text( tempFile ).transferTo( Outputs.systemOut() ); - } - - @Test - public void testCopyFileStreams() - throws IOException - { - File source = sourceFile(); - File tempFile = File.createTempFile( "test", ".txt" ); - tempFile.deleteOnExit(); - - Inputs.byteBuffer( new FileInputStream( source ), 1024 ).transferTo( - Outputs.byteBuffer( new FileOutputStream( tempFile ) ) ); - - Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( source.length() ) ); - } - - @Test - public void testLog() - throws IOException - { - File source = sourceFile(); - - text( source ).transferTo( - Transforms.map( new Transforms.Log<String>( Logger.getLogger( getClass().getName() ), "Line: {0}" ), - Outputs.<String>noop() ) ); - } - - @Test - public void testProgressLog() - throws Throwable - { - Integer[] data = new Integer[ 105 ]; - Arrays.fill( data, 42 ); - - Inputs.iterable( iterable( data ) ).transferTo( - Transforms.map( - new Transforms.ProgressLog<Integer>( - Logger.getLogger( InputOutputTest.class.getName() ), "Data transferred: {0}", 10 ), - Outputs.<Integer>noop() ) ); - } - - @Test - public void testTextInputsOutputs() - throws IOException - { - File tempFile = File.createTempFile( "test", ".txt" ); - tempFile.deleteOnExit(); - File sourceFile = sourceFile(); - Transforms.Counter<String> stringCounter = new Transforms.Counter<>(); - text( sourceFile ).transferTo( - Transforms.map( - stringCounter, - Transforms.map( new Function<String, String>() - { - public String map( String s ) - { - System.out.println( s ); - return s; - } - }, Outputs.text( tempFile ) ) - ) - ); - - Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( sourceFile.length() ) ); - Assert.assertThat( stringCounter.count(), CoreMatchers.equalTo( 4L ) ); - } - - @Test - public void testCombineInputs() - throws IOException - { - File tempFile = File.createTempFile( "test", ".txt" ); - tempFile.deleteOnExit(); - File sourceFile = sourceFile(); - Transforms.Counter<String> stringCounter = new Transforms.Counter<>(); - Input<String, IOException> text1 = text( sourceFile ); - Input<String, IOException> text2 = text( sourceFile ); - List<Input<String, IOException>> list = createList( text1, text2 ); - Inputs.combine( list ).transferTo( - Transforms.map( - stringCounter, - Transforms.map( new Function<String, String>() - { - public String map( String s ) - { - System.out.println( s ); - return s; - } - }, Outputs.text( tempFile ) ) - ) - ); - - Assert.assertThat( tempFile.length(), CoreMatchers.equalTo( sourceFile.length() * 2 ) ); - Assert.assertThat( stringCounter.count(), CoreMatchers.equalTo( 8L ) ); - } - - @SuppressWarnings( "unchecked" ) - private List<Input<String, IOException>> createList( Input<String, IOException> text1, - Input<String, IOException> text2 - ) - { - return asList( text1, text2 ); - } - - @Test( expected = IOException.class ) - public void testInputOutputOutputException() - throws IOException - { - - text( sourceFile() ). - transferTo( writerOutput( new Writer() - { - @Override - public void write( char[] cbuf, int off, int len ) - throws IOException - { - throw new IOException(); - } - - @Override - public void flush() - throws IOException - { - throw new IOException(); - } - - @Override - public void close() - throws IOException - { - throw new IOException(); - } - } ) ); - } - - @Test( expected = RemoteException.class ) - public void testInputOutputInputException() - throws IOException - { - - Input<String, RemoteException> input = new Input<String, RemoteException>() - { - @Override - public <OutputThrowableType extends Throwable> void transferTo( Output<? super String, OutputThrowableType> output ) - throws RemoteException, OutputThrowableType - { - output.receiveFrom( new Sender<String, RemoteException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiverThrowableTypeReceiver ) - throws ReceiverThrowableType, RemoteException - { - throw new RemoteException(); - } - } ); - } - }; - - input.transferTo( - Transforms.map( - new Transforms.Log<String>( Logger.getLogger( getClass().getName() ), "Line: {0}" ), - Outputs.systemOut() - ) - ); - } - - @Test - public void testLock() - throws IOException - { - Lock inputLock = new ReentrantLock(); - Lock outputLock = new ReentrantLock(); - - URL source = getClass().getResource( "/iotest.txt" ); - File destination = File.createTempFile( "test", ".txt" ); - destination.deleteOnExit(); - lock( inputLock, text( source ) ).transferTo( lock( outputLock, Outputs.text( destination ) ) ); - } - - @Test - public void testGenerics() - { - ArrayList<Object> objects = new ArrayList<>( 3 ); - Inputs.iterable( Arrays.asList( "Foo", "Bar", "Xyzzy" ) ).transferTo( Outputs.collection( objects ) ); - - Inputs.iterable( objects ).transferTo( Outputs.systemOut() ); - } - - @Test - public void testOutputstreamInput() - throws Throwable - { - Input<ByteBuffer, IOException> input = Inputs.output( new Visitor<OutputStream, IOException>() - { - @Override - public boolean visit( OutputStream visited ) - throws IOException - { - try( PrintWriter writer = new PrintWriter( visited ) ) - { - writer.print( "Hello World!" ); - } - return true; - } - }, 256 ); - - input.transferTo( Transforms.map( new Transforms.ByteBuffer2String( Charset.defaultCharset() ), Outputs.systemOut() ) ); - input.transferTo( Transforms.map( new Transforms.ByteBuffer2String( Charset.defaultCharset() ), Outputs.systemOut() ) ); - } - - public Output<String, IOException> writerOutput( final Writer writer ) - { - return new Output<String, IOException>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - // Here we initiate the transfer - System.out.println( "Open output" ); - final StringBuilder builder = new StringBuilder(); - try - { - sender.sendTo( new Receiver<String, IOException>() - { - @Override - public void receive( String item ) - throws IOException - { - System.out.println( "Receive input" ); - - // Here we can do batch writes if needed - builder.append( item ).append( "\n" ); - } - } ); - - // If transfer went well, do something with it - writer.write( builder.toString() ); - writer.flush(); - System.out.println( "Output written" ); - } - catch( IOException e ) - { - // If transfer failed, potentially rollback writes - System.out.println( "Input failed" ); - throw e; - } - } - }; - } - - private File sourceFile() - { - String path = getClass().getResource( "/iotest.txt" ).getFile(); - return new File( path.replaceAll( "%20", " " ) ); - } -} http://git-wip-us.apache.org/repos/asf/zest-java/blob/a789141d/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java ---------------------------------------------------------------------- diff --git a/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java b/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java deleted file mode 100644 index 9ff74a2..0000000 --- a/core/io/src/test/java/org/apache/zest/io/docsupport/IoDocs.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.zest.io.docsupport; - -import java.io.File; -import java.io.IOException; -import org.apache.zest.io.Inputs; -import org.apache.zest.io.Outputs; - -// START SNIPPET: io2 -import org.apache.zest.io.Transforms.Counter; -import static org.apache.zest.io.Transforms.map; -// END SNIPPET: io2 - -public class IoDocs -{ - public static void main( String[] args ) - throws IOException - { - { -// START SNIPPET: io1 - File source = new File( "source.txt" ); - File destination = new File( "destination.txt" ); - Inputs.text( source ).transferTo( Outputs.text( destination ) ); -// END SNIPPET: io1 - } - { -// START SNIPPET: io2 - File source = new File( "source.txt" ); - File destination = new File( "destination.txt" ); - Counter<String> counter = new Counter<String>(); - Inputs.text( source ).transferTo( map(counter, Outputs.text(destination) )); - System.out.println( "Lines: " + counter.count() ); -// END SNIPPET: io2 - } - } -}
