http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/functional/src/test/java/org/qi4j/functional/IterablesTest.java ---------------------------------------------------------------------- diff --git a/core/functional/src/test/java/org/qi4j/functional/IterablesTest.java b/core/functional/src/test/java/org/qi4j/functional/IterablesTest.java deleted file mode 100644 index 6aee556..0000000 --- a/core/functional/src/test/java/org/qi4j/functional/IterablesTest.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.qi4j.functional; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Enumeration; -import java.util.List; -import org.hamcrest.CoreMatchers; -import org.junit.Test; - -import static java.util.Collections.*; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.assertThat; - -/** - * Test of Iterables utility methods - */ -public class IterablesTest -{ - - private List<String> numbers = Arrays.asList( "1", "2", "3" ); - private Iterable<Long> numberLongs = Arrays.asList( 1L, 2L, 3L ); - private Iterable<Integer> numberIntegers = Arrays.asList( 1, 2, 3 ); - - @Test - public void testConstant() - { - String str = ""; - - for( String string : Iterables.limit( 3, Iterables.constant( "123" ) ) ) - { - str += string; - } - - assertThat( str, CoreMatchers.equalTo( "123123123" ) ); - } - - @Test - public void testUnique() - { - String str = ""; - - for( String string : Iterables.unique( Iterables.<String>flatten( numbers, numbers, numbers ) ) ) - { - str += string; - } - assertThat( str, CoreMatchers.equalTo( "123" ) ); - } - - @Test - public void testAddAll() - { - List<String> strings = Iterables.toList( numbers ); - assertThat( strings.toString(), equalTo( "[1, 2, 3]" ) ); - assertThat( Iterables.toList( numberLongs ).toString(), equalTo( "[1, 2, 3]" ) ); - } - - @Test - public void testCount() - { - assertThat( Iterables.count( numbers ), equalTo( 3L ) ); - } - - @Test - public void testFilter() - { - assertThat( Iterables.first( Iterables.filter( Specifications.in( "2" ), numbers ) ), equalTo( "2" ) ); - } - - @Test - public void testFirst() - { - assertThat( Iterables.first( numbers ), equalTo( "1" ) ); - assertThat( Iterables.first( emptyList() ), nullValue() ); - } - - @Test - public void testLast() - { - assertThat( Iterables.last( numbers ), equalTo( "3" ) ); - assertThat( Iterables.last( emptyList() ), nullValue() ); - } - - @Test - public void testFolding() - { - assertThat( Iterables.fold( new Function<Integer, Integer>() - { - - int sum = 0; - - @Override - public Integer map( Integer number ) - { - return sum += number; - } - - }, numberIntegers ), equalTo( 6 ) ); - } - - @Test - public void testAppend() - { - assertThat( Iterables.toList( Iterables.append( "C", Iterables.iterable( "A", "B" ) ) ).toString(), - equalTo( "[A, B, C]" ) ); - } - - @Test - public void testReverse() - { - assertThat( Iterables.reverse( numbers ).toString(), equalTo( "[3, 2, 1]" ) ); - assertThat( Iterables.reverse( emptyList() ), equalTo( (Object) emptyList() ) ); - } - - @Test - public void testMatchesAny() - { - assertThat( Iterables.matchesAny( Specifications.in( "2" ), numbers ), equalTo( true ) ); - assertThat( Iterables.matchesAny( Specifications.in( "4" ), numbers ), equalTo( false ) ); - } - - @Test - public void testMatchesAll() - { - assertThat( Iterables.matchesAll( Specifications.in( "1", "2", "3" ), numbers ), equalTo( true ) ); - assertThat( Iterables.matchesAll( Specifications.in( "2", "3", "4" ), numbers ), equalTo( false ) ); - } - - @Test - public void testFlatten() - { - assertThat( Iterables.toList( Iterables.flatten( numbers, numbers ) ).toString(), - equalTo( "[1, 2, 3, 1, 2, 3]" ) ); - - Iterable<? extends Number> flatten = Iterables.flatten( numberIntegers, numberLongs ); - assertThat( Iterables.toList( flatten ).toString(), equalTo( "[1, 2, 3, 1, 2, 3]" ) ); - } - - @Test - public void testFlattenIterables() - { - Iterable<List<String>> iterable = Iterables.iterable( numbers, numbers ); - assertThat( Iterables.toList( Iterables.flattenIterables( iterable ) ).toString(), - equalTo( "[1, 2, 3, 1, 2, 3]" ) ); - } - - @Test - public void testMix() - { - assertThat( Iterables.toList( Iterables.mix( Iterables.iterable( "A", "B", "C" ), - Iterables.iterable( "1", "2", "3", "4", "5" ), - Iterables.iterable( "X", "Y", "Z" ) ) ).toString(), - equalTo( "[A, 1, X, B, 2, Y, C, 3, Z, 4, 5]" ) ); - } - - @Test - public void testMap() - { - assertThat( Iterables.toList( Iterables.map( new Function<String, String>() - { - - public String map( String s ) - { - return s + s; - } - - }, numbers ) ).toString(), equalTo( "[11, 22, 33]" ) ); - - Iterable<List<String>> numberIterable = Iterables.iterable( numbers, numbers, numbers ); - assertThat( Iterables.toList( Iterables.map( new Function<Collection, Integer>() - { - - @Override - public Integer map( Collection collection ) - { - return collection.size(); - } - - }, numberIterable ) ).toString(), equalTo( "[3, 3, 3]" ) ); - } - - @Test - public void testIterableEnumeration() - { - - Enumeration<String> enumeration = enumeration( numbers ); - assertThat( Iterables.toList( Iterables.iterable( enumeration ) ).toString(), - equalTo( "[1, 2, 3]" ) ); - } - - @Test - public void testIterableVarArg() - { - assertThat( Iterables.toList( Iterables.iterable( "1", "2", "3" ) ).toString(), - equalTo( "[1, 2, 3]" ) ); - } - - @Test - public void testCast() - { - Iterable<Long> values = numberLongs; - Iterable<Number> numbers = Iterables.cast( values ); - } - - @Test - public void testDebug() - { - assertThat( Iterables.first( Iterables.debug( "Filtered number:{0}", - Iterables.filter( Specifications.in( "2" ), - Iterables.debug( "Number:{0}", numbers ) ) ) ), - equalTo( "2" ) ); - } - - @Test - public void testDebugWithFunctions() - { - Function<String, String> fun = new Function<String, String>() - { - - @Override - public String map( String s ) - { - return s + ":" + s.length(); - } - - }; - assertThat( Iterables.first( Iterables.debug( "Filtered number:{0}", - Iterables.filter( Specifications.in( "2" ), - Iterables.debug( "Number:{0}", numbers, fun ) ) ) ), - equalTo( "2" ) ); - } - - @Test - public void testCache() - { - final int[] count = new int[ 1 ]; - - Iterable<String> b = Iterables.cache( Iterables.filter( Specifications.and( new Specification<String>() - { - - @Override - public boolean satisfiedBy( String item ) - { - count[ 0] = count[ 0] + 1; - return true; - } - - }, Specifications.in( "B" ) ), Iterables.iterable( "A", "B", "C" ) ) ); - - assertThat( count[ 0], equalTo( 0 ) ); - - Iterables.toList( b ); - - assertThat( count[ 0], equalTo( 3 ) ); - - Iterables.toList( b ); - - assertThat( count[ 0], equalTo( 3 ) ); - } - - @Test - public void testSort() - { - assertThat( Iterables.sort( Iterables.reverse( numberLongs ) ).toString(), equalTo( "[1, 2, 3]" ) ); - - Comparator<Long> inverseLongComparator = new Comparator<Long>() - { - - @Override - public int compare( Long left, Long right ) - { - return left.compareTo( right ) * -1; - } - - }; - assertThat( Iterables.sort( inverseLongComparator, numberLongs ).toString(), equalTo( "[3, 2, 1]" ) ); - } - -}
http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/functional/src/test/java/org/qi4j/functional/SpecificationsTest.java ---------------------------------------------------------------------- diff --git a/core/functional/src/test/java/org/qi4j/functional/SpecificationsTest.java b/core/functional/src/test/java/org/qi4j/functional/SpecificationsTest.java deleted file mode 100644 index 98b893e..0000000 --- a/core/functional/src/test/java/org/qi4j/functional/SpecificationsTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.qi4j.functional; - -import org.junit.Assert; -import org.junit.Test; - -import static org.hamcrest.CoreMatchers.equalTo; - -/** - * JAVADOC - */ -public class SpecificationsTest -{ - @Test - public void testTRUE() - { - Assert.assertThat( Specifications.<Object>TRUE().satisfiedBy( new Object() ), equalTo( true ) ); - } - - @Test - public void testNot() - { - Assert.assertThat( Specifications.not( Specifications.<Object>TRUE() ) - .satisfiedBy( new Object() ), equalTo( false ) ); - } - - @Test - public void testAnd() - { - Specification<Object> trueSpec = Specifications.<Object>TRUE(); - Specification<Object> falseSpec = Specifications.not( Specifications.<Object>TRUE() ); - - Assert.assertThat( Specifications.and( falseSpec, falseSpec ).satisfiedBy( new Object() ), equalTo( false ) ); - Assert.assertThat( Specifications.and( trueSpec, falseSpec ).satisfiedBy( new Object() ), equalTo( false ) ); - Assert.assertThat( Specifications.and( falseSpec, trueSpec ).satisfiedBy( new Object() ), equalTo( false ) ); - Assert.assertThat( Specifications.and( trueSpec, trueSpec ).satisfiedBy( new Object() ), equalTo( true ) ); - } - - @Test - public void testOr() - { - Specification<Object> trueSpec = Specifications.<Object>TRUE(); - Specification<Object> falseSpec = Specifications.not( Specifications.<Object>TRUE() ); - - Assert.assertThat( Specifications.or( falseSpec, falseSpec ).satisfiedBy( new Object() ), equalTo( false ) ); - Assert.assertThat( Specifications.or( trueSpec, falseSpec ).satisfiedBy( new Object() ), equalTo( true ) ); - Assert.assertThat( Specifications.or( falseSpec, trueSpec ).satisfiedBy( new Object() ), equalTo( true ) ); - Assert.assertThat( Specifications.or( trueSpec, trueSpec ).satisfiedBy( new Object() ), equalTo( true ) ); - } - - @Test - public void testIn() - { - Assert.assertThat( Specifications.in( "1", "2", "3" ).satisfiedBy( "2" ), equalTo( true ) ); - Assert.assertThat( Specifications.in( "1", "2", "3" ).satisfiedBy( "4" ), equalTo( false ) ); - } - - @Test - public void testTranslate() - { - Function<Object, String> stringifier = new Function<Object, String>() - { - @Override - public String map( Object s ) - { - return s.toString(); - } - }; - - Assert.assertTrue( Specifications.translate( stringifier, Specifications.in( "3" ) ).satisfiedBy( 3L ) ); - } -} http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/functional/src/test/java/org/qi4j/functional/docsupport/FunctionalDocs.java ---------------------------------------------------------------------- diff --git a/core/functional/src/test/java/org/qi4j/functional/docsupport/FunctionalDocs.java b/core/functional/src/test/java/org/qi4j/functional/docsupport/FunctionalDocs.java deleted file mode 100644 index 7110a21..0000000 --- a/core/functional/src/test/java/org/qi4j/functional/docsupport/FunctionalDocs.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.qi4j.functional.docsupport; - -import java.util.ArrayList; - -// START SNIPPET: func2 -import static org.qi4j.functional.ForEach.forEach; -import static org.qi4j.functional.Functions.longSum; -// END SNIPPET: func2 - -public class FunctionalDocs -{ - public static void main( String[] args ) - { - { -// START SNIPPET: func1 - Iterable<Long> data = new ArrayList<Long>(); -// END SNIPPET: func1 -// START SNIPPET: func1 - - long sum = 0; - for( Long point : data ) - { - sum = sum + point; - } - System.out.println( "The sum is " + sum ); -// END SNIPPET: func1 - } - { -// START SNIPPET: func2 - Iterable<Number> data = new ArrayList<Number>(); - Long sum = forEach( data ).map( longSum() ).last(); - System.out.println( "The sum is " + sum ); - -// END SNIPPET: func2 - } - } -} http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Files.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Files.java b/core/io/src/main/java/org/apache/zest/io/Files.java new file mode 100644 index 0000000..14834f3 --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Files.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.zest.io; + +import java.io.File; +import java.util.Random; + +/** + * Utility class for files. + */ +public class Files +{ + private static Random random = new Random(); + + public static File createTemporayFileOf( File file ) + { + return new File( file.getAbsolutePath() + "_" + Math.abs( random.nextLong() ) ); + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Input.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Input.java b/core/io/src/main/java/org/apache/zest/io/Input.java new file mode 100644 index 0000000..d7bf8ab --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Input.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.zest.io; + +/** + * Input source of data. + * <p> + * Invoke transferTo to send data from this input to given output. transferTo can be invoked + * as many times as you want. The transferTo implementation must ensure that any exceptions thrown + * by the Input or the Output which transferred data is sent to is handled properly, i.e. that resources + * are closed. Any client code to transferTo calls should not have to bother with resource management, + * but may catch exceptions anyway for logging and similar purposes. + * </p> + */ +// START SNIPPET: input +public interface Input<T, SenderThrowableType extends Throwable> +{ + <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) + throws SenderThrowableType, ReceiverThrowableType; +} +// END SNIPPET: input http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Inputs.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Inputs.java b/core/io/src/main/java/org/apache/zest/io/Inputs.java new file mode 100644 index 0000000..51f03fb --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Inputs.java @@ -0,0 +1,490 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.zest.io; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.net.URL; +import java.net.URLConnection; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Scanner; +import java.util.zip.GZIPInputStream; +import org.apache.zest.functional.Visitor; + +/** + * Common inputs + */ +public class Inputs +{ + // START SNIPPET: method + + /** + * Read lines from a String. + * + * @param source lines + * + * @return Input that provides lines from the string as strings + */ + public static Input<String, RuntimeException> text( final String source ) + // END SNIPPET: method + { + return new Input<String, RuntimeException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws RuntimeException, ReceiverThrowableType + { + + output.receiveFrom( new Sender<String, RuntimeException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, RuntimeException + { + Scanner scanner = new Scanner( source ); + while( scanner.hasNextLine() ) + { + receiver.receive( scanner.nextLine() ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Read lines from a Reader. + * + * @param source lines + * + * @return Input that provides lines from the string as strings + */ + public static Input<String, RuntimeException> text( final Reader source ) + // END SNIPPET: method + { + return new Input<String, RuntimeException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws RuntimeException, ReceiverThrowableType + { + + output.receiveFrom( new Sender<String, RuntimeException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, RuntimeException + { + Scanner scanner = new Scanner( source ); + while( scanner.hasNextLine() ) + { + receiver.receive( scanner.nextLine() ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Read lines from a UTF-8 encoded textfile. + * + * If the filename ends with .gz, then the data is automatically unzipped when read. + * + * @param source textfile with lines separated by \n character + * + * @return Input that provides lines from the textfiles as strings + */ + public static Input<String, IOException> text( final File source ) + // END SNIPPET: method + { + return text( source, "UTF-8" ); + } + + // START SNIPPET: method + + /** + * Read lines from a textfile with the given encoding. + * + * If the filename ends with .gz, then the data is automatically unzipped when read. + * + * @param source textfile with lines separated by \n character + * @param encoding encoding of file, e.g. "UTF-8" + * + * @return Input that provides lines from the textfiles as strings + */ + public static Input<String, IOException> text( final File source, final String encoding ) + // END SNIPPET: method + { + return new Input<String, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + InputStream stream = new FileInputStream( source ); + + // If file is gzipped, unzip it automatically + if( source.getName().endsWith( ".gz" ) ) + { + stream = new GZIPInputStream( stream ); + } + + try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, encoding ) )) + { + output.receiveFrom( new Sender<String, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + String line; + while( ( line = reader.readLine() ) != null ) + { + receiver.receive( line ); + } + } + } ); + } + } + }; + } + + // START SNIPPET: method + + /** + * Read lines from a textfile at a given URL. + * + * If the content support gzip encoding, then the data is automatically unzipped when read. + * + * The charset in the content-type of the URL will be used for parsing. Default is UTF-8. + * + * @param source textfile with lines separated by \n character + * + * @return Input that provides lines from the textfiles as strings + */ + public static Input<String, IOException> text( final URL source ) + // END SNIPPET: method + { + return new Input<String, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + URLConnection urlConnection = source.openConnection(); + urlConnection.setRequestProperty( "Accept-Encoding", "gzip" ); + InputStream stream = urlConnection.getInputStream(); + + // If file is gzipped, unzip it automatically + if( "gzip".equals( urlConnection.getContentEncoding() ) ) + { + stream = new GZIPInputStream( stream ); + } + + // Figure out charset given content-type + String contentType = urlConnection.getContentType(); + String charSet = "UTF-8"; + if( contentType.contains( "charset=" ) ) + { + charSet = contentType.substring( contentType.indexOf( "charset=" ) + "charset=".length() ); + } + + try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, charSet ) )) + { + output.receiveFrom( new Sender<String, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + String line; + while( ( line = reader.readLine() ) != null ) + { + receiver.receive( line ); + } + } + } ); + } + } + }; + } + + // START SNIPPET: method + + /** + * Read a file using ByteBuffer of a given size. Useful for transferring raw data. + * + * @param source The file to be read. + * @param bufferSize The size of the byte array. + * + * @return An Input instance to be applied to streaming operations. + */ + public static Input<ByteBuffer, IOException> byteBuffer( final File source, final int bufferSize ) + // END SNIPPET: method + { + return new Input<ByteBuffer, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + final FileInputStream stream = new FileInputStream( source ); + final FileChannel fci = stream.getChannel(); + + final ByteBuffer buffer = ByteBuffer.allocate( bufferSize ); + + try + { + output.receiveFrom( new Sender<ByteBuffer, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + while( fci.read( buffer ) != -1 ) + { + buffer.flip(); + receiver.receive( buffer ); + buffer.clear(); + } + } + } ); + } + finally + { + stream.close(); + } + } + }; + } + + // START SNIPPET: method + + /** + * Read an inputstream using ByteBuffer of a given size. + * + * @param source The InputStream to be read. + * @param bufferSize The size of the byte array. + * + * @return An Input instance to be applied to streaming operations. + */ + public static Input<ByteBuffer, IOException> byteBuffer( final InputStream source, final int bufferSize ) + // END SNIPPET: method + { + return new Input<ByteBuffer, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + try + { + output.receiveFrom( new Sender<ByteBuffer, IOException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + byte[] buffer = new byte[ bufferSize ]; + + int len; + while( ( len = source.read( buffer ) ) != -1 ) + { + ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, 0, len ); + receiver.receive( byteBuffer ); + } + } + } ); + } + finally + { + source.close(); + } + } + }; + } + + // START SNIPPET: method + + /** + * Combine many Input into one single Input. When a transfer is initiated from it all items from all inputs will be transferred + * to the given Output. + * + * @param inputs An Iterable of Input instances to be combined. + * @param <T> The item type of the Input + * @param <SenderThrowableType> The Throwable that might be thrown by the Inputs. + * + * @return A combined Input, allowing for easy aggregation of many Input sources. + */ + public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> combine( final Iterable<Input<T, SenderThrowableType>> inputs ) + // END SNIPPET: method + { + return new Input<T, SenderThrowableType>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void transferTo( Output<? super T, Receiver2ThrowableType> output ) + throws SenderThrowableType, Receiver2ThrowableType + { + output.receiveFrom( new Sender<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + for( Input<T, SenderThrowableType> input : inputs ) + { + input.transferTo( new Output<T, ReceiverThrowableType>() + { + @Override + public <Sender2ThrowableType extends Throwable> void receiveFrom( Sender<? extends T, Sender2ThrowableType> sender ) + throws ReceiverThrowableType, Sender2ThrowableType + { + sender.sendTo( new Receiver<T, ReceiverThrowableType>() + { + @Override + public void receive( T item ) + throws ReceiverThrowableType + { + receiver.receive( item ); + } + } ); + } + } ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Create an Input that takes its items from the given Iterable. + * + * @param iterable The Iterable to be used as an Input. + * @param <T> The item type of the Input + * + * @return An Input instance that is backed by the Iterable. + */ + public static <T> Input<T, RuntimeException> iterable( final Iterable<T> iterable ) + // END SNIPPET: method + { + return new Input<T, RuntimeException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) + throws RuntimeException, ReceiverThrowableType + { + output.receiveFrom( new Sender<T, RuntimeException>() + { + @Override + public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super T, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, RuntimeException + { + for( T item : iterable ) + { + receiver.receive( item ); + } + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Create an Input that allows a Visitor to write to an OutputStream. The stream is a BufferedOutputStream, so when enough + * data has been gathered it will send this in chunks of the given size to the Output it is transferred to. The Visitor does not have to call + * close() on the OutputStream, but should ensure that any wrapper streams or writers are flushed so that all data is sent. + * + * @param outputVisitor The OutputStream Visitor that will be backing the Input ByteBuffer. + * @param bufferSize The buffering size. + * + * @return An Input instance of ByteBuffer, that is backed by an Visitor to an OutputStream. + */ + public static Input<ByteBuffer, IOException> output( final Visitor<OutputStream, IOException> outputVisitor, + final int bufferSize + ) + // END SNIPPET: method + { + return new Input<ByteBuffer, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) + throws IOException, ReceiverThrowableType + { + output.receiveFrom( new Sender<ByteBuffer, IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <Receiver2ThrowableType extends Throwable> void sendTo( final Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) + throws Receiver2ThrowableType, IOException + { + try (OutputStream out = new BufferedOutputStream( new OutputStream() + { + @Override + public void write( int b ) + throws IOException + { + // Ignore + } + + @SuppressWarnings( "NullableProblems" ) + @Override + public void write( byte[] b, int off, int len ) + throws IOException + { + try + { + ByteBuffer byteBuffer = ByteBuffer.wrap( b, 0, len ); + receiver.receive( byteBuffer ); + } + catch( Throwable ex ) + { + throw new IOException( ex ); + } + } + }, bufferSize )) + { + outputVisitor.visit( out ); + } + catch( IOException ex ) + { + throw (Receiver2ThrowableType) ex.getCause(); + } + } + } ); + } + }; + } + + private Inputs() + { + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Output.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Output.java b/core/io/src/main/java/org/apache/zest/io/Output.java new file mode 100644 index 0000000..9aa514f --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Output.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.zest.io; + +/** + * Output for data. + */ +// START SNIPPET: output +public interface Output<T, ReceiverThrowableType extends Throwable> +{ +// END SNIPPET: output + + /** + * This initiates a transfer from an Input. Implementations should open any resources to be written to + * and then call sender.sendTo() when it is ready to receive data. When sendTo() returns the resource should be + * closed properly. Make sure to handle any exceptions from sendTo. + * + * @param sender the sender of data to this output + * @param <SenderThrowableType> the exception that sendTo can throw + * + * @throws SenderThrowableType the exception that the sender can throw + * @throws ReceiverThrowableType the exception that this output can throw from receiveItem() + */ +// START SNIPPET: output + <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws ReceiverThrowableType, SenderThrowableType; +} +// END SNIPPET: output http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Outputs.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Outputs.java b/core/io/src/main/java/org/apache/zest/io/Outputs.java new file mode 100644 index 0000000..e90f384 --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Outputs.java @@ -0,0 +1,528 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.zest.io; + +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Collection; +import java.util.zip.GZIPOutputStream; + +/** + * Utility methods for creating standard Outputs + */ +public class Outputs +{ + // START SNIPPET: method + + /** + * Write lines to a text file with UTF-8 encoding. Separate each line with a newline ("\n" character). If the writing or sending fails, + * the file is deleted. + * <p> + * If the filename ends with .gz, then the data is automatically GZipped. + * </p> + * @param file the file to save the text to + * + * @return an Output for storing text in a file + */ + public static Output<String, IOException> text( final File file ) + // END SNIPPET: method + { + return text( file, "UTF-8" ); + } + + // START SNIPPET: method + + /** + * Write lines to a text file. Separate each line with a newline ("\n" character). If the writing or sending fails, + * the file is deleted. + * <p> + * If the filename ends with .gz, then the data is automatically GZipped. + * </p> + * @param file the file to save the text to + * + * @return an Output for storing text in a file + */ + public static Output<String, IOException> text( final File file, final String encoding ) + // END SNIPPET: method + { + return new Output<String, IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + File tmpFile = Files.createTemporayFileOf( file ); + + OutputStream stream = new FileOutputStream( tmpFile ); + + // If file should be gzipped, do that automatically + if( file.getName().endsWith( ".gz" ) ) + { + stream = new GZIPOutputStream( stream ); + } + + final BufferedWriter writer = new BufferedWriter( new OutputStreamWriter( stream, encoding ) ); + + try + { + sender.sendTo( new Receiver<String, IOException>() + { + @Override + public void receive( String item ) + throws IOException + { + writer.append( item ).append( '\n' ); + } + } ); + writer.close(); + + // Replace file with temporary file + if( !file.exists() || file.delete() ) + { + if( ! tmpFile.renameTo( file ) ) + { + // TODO: What?? Throw an Exception? + System.err.println( "Unable to rename file: " + tmpFile + " to " + file ); + } + } + } + catch( IOException e ) + { + // We failed writing - close and delete + writer.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + } + catch( Throwable senderThrowableType ) + { + // We failed writing - close and delete + writer.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + throw (SenderThrowableType) senderThrowableType; + } + } + }; + } + + // START SNIPPET: method + + /** + * Write lines to a Writer. Separate each line with a newline ("\n" character). + * + * @param writer the Writer to write the text to + * @return an Output for storing text in a Writer + */ + public static Output<String, IOException> text( final Writer writer ) + // END SNIPPET: method + { + return new Output<String, IOException>() + { + + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + sender.sendTo( new Receiver<String, IOException>() + { + + @Override + public void receive( String item ) + throws IOException + { + writer.append( item ).append( "\n" ); + } + + } ); + } + + }; + } + + // START SNIPPET: method + + /** + * Write lines to a StringBuilder. Separate each line with a newline ("\n" character). + * + * @param builder the StringBuilder to append the text to + * @return an Output for storing text in a StringBuilder + */ + public static Output<String, IOException> text( final StringBuilder builder ) + // END SNIPPET: method + { + return new Output<String, IOException>() + { + + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + sender.sendTo( new Receiver<String, IOException>() + { + + @Override + public void receive( String item ) + throws IOException + { + builder.append( item ).append( "\n" ); + } + + } ); + } + + }; + } + + // START SNIPPET: method + + /** + * Write ByteBuffer data to a file. If the writing or sending of data fails the file will be deleted. + * + * @param file The destination file. + * + * @return The Output ByteBuffer instance backed by a File. + */ + public static Output<ByteBuffer, IOException> byteBuffer( final File file ) + // END SNIPPET: method + { + return new Output<ByteBuffer, IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + File tmpFile = Files.createTemporayFileOf( file ); + FileOutputStream stream = new FileOutputStream( tmpFile ); + final FileChannel fco = stream.getChannel(); + + try + { + sender.sendTo( new Receiver<ByteBuffer, IOException>() + { + @Override + public void receive( ByteBuffer item ) + throws IOException + { + fco.write( item ); + } + } ); + stream.close(); + + // Replace file with temporary file + if( !file.exists() || file.delete() ) + { + if( ! tmpFile.renameTo( file ) ) + { + // TODO: What can be done in this case? + System.err.println( "Unable to rename file: " + tmpFile + " to " + file ); + } + } + } + catch( IOException e ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + + } + catch( Throwable senderThrowableType ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + throw (SenderThrowableType) senderThrowableType; + } + } + }; + } + + // START SNIPPET: method + + /** + * Write ByteBuffer data to an OutputStream. + * + * @param stream Destination OutputStream + * + * @return The Output of ByteBuffer that will be backed by the OutputStream. + */ + public static Output<ByteBuffer, IOException> byteBuffer( final OutputStream stream ) + // END SNIPPET: method + { + return new Output<ByteBuffer, IOException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends ByteBuffer, SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + try + { + sender.sendTo( new Receiver<ByteBuffer, IOException>() + { + @Override + public void receive( ByteBuffer item ) + throws IOException + { + if( item.hasArray() ) + { + stream.write( item.array(), item.arrayOffset(), item.limit() ); + } + else + { + for( int i = 0; i < item.limit(); i++ ) + { + stream.write( item.get( i ) ); + } + } + } + } ); + } + finally + { + stream.close(); + } + } + }; + } + + // START SNIPPET: method + + /** + * Write byte array data to a file. If the writing or sending of data fails the file will be deleted. + * + * @param file The File to be written to. + * @param bufferSize The size of the ByteBuffer. + * + * @return An Output instance that will write to the given File. + */ + public static Output<byte[], IOException> bytes( final File file, final int bufferSize ) + // END SNIPPET: method + { + return new Output<byte[], IOException>() + { + @Override + @SuppressWarnings( "unchecked" ) + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends byte[], SenderThrowableType> sender ) + throws IOException, SenderThrowableType + { + File tmpFile = Files.createTemporayFileOf( file ); + final OutputStream stream = new BufferedOutputStream( new FileOutputStream( tmpFile ), bufferSize ); + + try + { + sender.sendTo( new Receiver<byte[], IOException>() + { + @Override + public void receive( byte[] item ) + throws IOException + { + stream.write( item ); + } + } ); + stream.close(); + + // Replace file with temporary file + if( !file.exists() || file.delete() ) + { + if( ! tmpFile.renameTo( file ) ) + { + // TODO: WHAT??? + System.err.println( "Unable to rename " + tmpFile + " to " + file ); + } + } + } + catch( IOException e ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + } + catch( Throwable senderThrowableType ) + { + // We failed writing - close and delete + stream.close(); + if( ! tmpFile.delete() ) + { + System.err.println("Unable to delete temporary file." ); + tmpFile.deleteOnExit(); + } + throw (SenderThrowableType) senderThrowableType; + } + } + }; + } + + // START SNIPPET: method + + /** + * Do nothing. Use this if you have all logic in filters and/or specifications + * + * @param <T> The item type. + * + * @return An Output instance that ignores all data. + */ + public static <T> Output<T, RuntimeException> noop() + // END SNIPPET: method + { + return withReceiver( new Receiver<T, RuntimeException>() + { + @Override + public void receive( T item ) + throws RuntimeException + { + // Do nothing + } + } ); + } + + // START SNIPPET: method + + /** + * Use given receiver as Output. Use this if there is no need to create a "transaction" for each transfer, and no need + * to do batch writes or similar. + * + * @param <T> The item type + * @param receiver receiver for this Output + * + * @return An Output instance backed by a Receiver of items. + */ + public static <T, ReceiverThrowableType extends Throwable> Output<T, ReceiverThrowableType> withReceiver( final Receiver<T, ReceiverThrowableType> receiver ) + // END SNIPPET: method + { + return new Output<T, ReceiverThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( receiver ); + } + }; + } + + // START SNIPPET: method + + /** + * Write objects to System.out.println. + * + * @return An Output instance that is backed by System.out + */ + public static Output<Object, RuntimeException> systemOut() + // END SNIPPET: method + { + return new Output<Object, RuntimeException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender ) + throws RuntimeException, SenderThrowableType + { + sender.sendTo( new Receiver<Object, RuntimeException>() + { + @Override + public void receive( Object item ) + { + System.out.println( item ); + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Write objects to System.err.println. + * + * @return An Output instance backed by System.in + */ + @SuppressWarnings( "UnusedDeclaration" ) + public static Output<Object, RuntimeException> systemErr() + // END SNIPPET: method + { + return new Output<Object, RuntimeException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<?, SenderThrowableType> sender ) + throws RuntimeException, SenderThrowableType + { + sender.sendTo( new Receiver<Object, RuntimeException>() + { + @Override + public void receive( Object item ) + { + System.err.println( item ); + } + } ); + } + }; + } + + // START SNIPPET: method + + /** + * Add items to a collection + */ + public static <T> Output<T, RuntimeException> collection( final Collection<T> collection ) + // END SNIPPET: method + { + return new Output<T, RuntimeException>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws RuntimeException, SenderThrowableType + { + sender.sendTo( new Receiver<T, RuntimeException>() + { + @Override + public void receive( T item ) + throws RuntimeException + { + collection.add( item ); + } + } ); + } + }; + } + + private Outputs() + { + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Receiver.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Receiver.java b/core/io/src/main/java/org/apache/zest/io/Receiver.java new file mode 100644 index 0000000..e0038d0 --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Receiver.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.zest.io; + +/** + * Receiver of items during a specific transfer from an Input to an Output. + */ +// START SNIPPET: receiver +public interface Receiver<T, ReceiverThrowableType extends Throwable> +{ +// END SNIPPET: receiver + /** + * Receive a single item of the given type. The receiver should process it + * and optionally throw an exception if it fails. + * + * @param item + * + * @throws ReceiverThrowableType + */ +// START SNIPPET: receiver + void receive( T item ) + throws ReceiverThrowableType; +} +// END SNIPPET: receiver http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Sender.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Sender.java b/core/io/src/main/java/org/apache/zest/io/Sender.java new file mode 100644 index 0000000..5812cc6 --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Sender.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.zest.io; + +/** + * Sender of items for a particular transfer from an Input to an Output + */ +// START SNIPPET: sender +public interface Sender<T, SenderThrowableType extends Throwable> +{ +// END SNIPPET: sender + /** + * The sender should send all items it holds to the receiver by invoking receiveItem for each item. + * + * If the receive fails it should properly close any open resources. + * + * @param receiver + * @param <ReceiverThrowableType> + * + * @throws ReceiverThrowableType + * @throws SenderThrowableType + */ +// START SNIPPET: sender + <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType; +} +// END SNIPPET: sender http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/Transforms.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/Transforms.java b/core/io/src/main/java/org/apache/zest/io/Transforms.java new file mode 100644 index 0000000..b17c542 --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/Transforms.java @@ -0,0 +1,435 @@ +/* + * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.zest.io; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.text.MessageFormat; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.logging.Logger; +import org.apache.zest.functional.Function; +import org.apache.zest.functional.Specification; + +/** + * Utility class for I/O transforms + */ +public class Transforms +{ + /** + * Filter items in a transfer by applying the given Specification to each item. + * + * @param specification The Specification defining the items to not filter away. + * @param output The Output instance to receive to result. + * @param <T> The item type + * @param <Receiver2ThrowableType> Exception type that might be thrown by the Receiver. + * + * @return And Output encapsulation the filter operation. + */ + public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filter( final Specification<? super T> specification, + final Output<T, Receiver2ThrowableType> output + ) + { + return new Output<T, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + output.receiveFrom( new Sender<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( new Receiver<T, ReceiverThrowableType>() + { + @Override + public void receive( T item ) + throws ReceiverThrowableType + { + if( specification.satisfiedBy( item ) ) + { + receiver.receive( item ); + } + } + } ); + } + } ); + } + }; + } + + /** + * Map items in a transfer from one type to another by applying the given function. + * + * @param function The transformation function to apply to the streaming items. + * @param output The output to receive the transformed items. + * @param <From> The type of the incoming items. + * @param <To> The type of the transformed items. + * @param <Receiver2ThrowableType> The exception type that the Receiver might throw. + * + * @return An Output instance that encapsulates the map transformation. + */ + public static <From, To, Receiver2ThrowableType extends Throwable> Output<From, Receiver2ThrowableType> map( final Function<? super From, ? extends To> function, + final Output<To, Receiver2ThrowableType> output + ) + { + return new Output<From, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends From, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + output.receiveFrom( new Sender<To, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super To, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( new Receiver<From, ReceiverThrowableType>() + { + @Override + public void receive( From item ) + throws ReceiverThrowableType + { + receiver.receive( function.map( item ) ); + } + } ); + } + } ); + } + }; + } + + /** + * Apply the given function to items in the transfer that match the given specification. Other items will pass + * through directly. + * + * @param specification The Specification defining which items should be transformed. + * @param function The transformation function. + * @param output The Output that will receive the resulting items. + * @param <T> The item type. Items can not be transformed to a new type. + * @param <Receiver2ThrowableType> The exception that the Receiver might throw. + * + * @return An Output instance that encapsulates the operation. + */ + public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filteredMap( final Specification<? super T> specification, + final Function<? super T, ? extends T> function, + final Output<T, Receiver2ThrowableType> output + ) + { + return new Output<T, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + output.receiveFrom( new Sender<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) + throws ReceiverThrowableType, SenderThrowableType + { + sender.sendTo( new Receiver<T, ReceiverThrowableType>() + { + @Override + public void receive( T item ) + throws ReceiverThrowableType + { + if( specification.satisfiedBy( item ) ) + { + receiver.receive( function.map( item ) ); + } + else + { + receiver.receive( item ); + } + } + } ); + } + } ); + } + }; + } + + /** + * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on + * the sending side and a write-lock would be used on the receiving side. Inputs can use this as well to create a + * wrapper on the send side when transferTo is invoked. + * + * @param lock the lock to be used for transfers + * @param output output to be wrapped + * @param <T> The Item type + * @param <Receiver2ThrowableType> The Exception type that the Receiver might throw. + * + * @return Output wrapper that uses the given lock during transfers. + */ + public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> lock( final Lock lock, + final Output<T, Receiver2ThrowableType> output + ) + { + return new Output<T, Receiver2ThrowableType>() + { + @Override + public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) + throws Receiver2ThrowableType, SenderThrowableType + { + /** + * Fix for this bug: + * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370 + */ + while( true ) + { + try + { + //noinspection StatementWithEmptyBody + while( !lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) + { + // On timeout, try again + } + break; // Finally got a lock + } + catch( InterruptedException e ) + { + // Try again + } + } + + try + { + output.receiveFrom( sender ); + } + finally + { + lock.unlock(); + } + } + }; + } + + /** + * Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on the sending side and a write-lock + * would be used on the receiving side. + * + * @param lock the lock to be used for transfers + * @param input input to be wrapped + * @param <T> The item type. + * @param <SenderThrowableType> The Exception type that the Sender might throw. + * + * @return Input wrapper that uses the given lock during transfers. + */ + public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> lock( final Lock lock, + final Input<T, SenderThrowableType> input + ) + { + return new Input<T, SenderThrowableType>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) + throws SenderThrowableType, ReceiverThrowableType + { + /** + * Fix for this bug: + * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370 + */ + while( true ) + { + try + { + //noinspection StatementWithEmptyBody + while( !( lock.tryLock() || lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) ) + { + // On timeout, try again + } + break; // Finally got a lock + } + catch( InterruptedException e ) + { + // Try again + } + } + + try + { + input.transferTo( output ); + } + finally + { + lock.unlock(); + } + } + }; + } + + /** + * Count the number of items in the transfer. + * + * @param <T> + */ + // START SNIPPET: counter + public static class Counter<T> + implements Function<T, T> + { + private volatile long count = 0; + + public long count() + { + return count; + } + + @Override + public T map( T t ) + { + count++; + return t; + } + } + // END SNIPPET: counter + + /** + * Convert strings to bytes using the given CharSet + */ + @SuppressWarnings( "UnusedDeclaration" ) + public static class String2Bytes + implements Function<String, byte[]> + { + private Charset charSet; + + public String2Bytes( Charset charSet ) + { + this.charSet = charSet; + } + + @Override + public byte[] map( String s ) + { + return s.getBytes( charSet ); + } + } + + /** + * Convert ByteBuffers to Strings using the given CharSet + */ + public static class ByteBuffer2String + implements Function<ByteBuffer, String> + { + private Charset charSet; + + public ByteBuffer2String( Charset charSet ) + { + this.charSet = charSet; + } + + @Override + public String map( ByteBuffer buffer ) + { + return new String( buffer.array(), charSet ); + } + } + + /** + * Convert objects to Strings using .toString() + */ + @SuppressWarnings( "UnusedDeclaration" ) + public static class ObjectToString + implements Function<Object, String> + { + @Override + public String map( Object o ) + { + return o.toString(); + } + } + + /** + * Log the toString() representation of transferred items to the given log. The string is first formatted using MessageFormat + * with the given format. + * + * @param <T> + */ + public static class Log<T> + implements Function<T, T> + { + private Logger logger; + private MessageFormat format; + + public Log( Logger logger, String format ) + { + this.logger = logger; + this.format = new MessageFormat( format ); + } + + @Override + public T map( T item ) + { + logger.info( format.format( new String[]{ item.toString() } ) ); + return item; + } + } + + /** + * Track progress of transfer by emitting a log message in given intervals. + * + * If logger or format is null, then you need to override the logProgress to do something + * + * @param <T> type of items to be transferred + */ + // START SNIPPET: progress + public static class ProgressLog<T> + implements Function<T, T> + { + private Counter<T> counter; + private Log<String> log; + private final long interval; + + public ProgressLog( Logger logger, String format, long interval ) + { + this.interval = interval; + if( logger != null && format != null ) + { + log = new Log<>( logger, format ); + } + counter = new Counter<>(); + } + + public ProgressLog( long interval ) + { + this.interval = interval; + counter = new Counter<>(); + } + + @Override + public T map( T t ) + { + counter.map( t ); + if( counter.count % interval == 0 ) + { + logProgress(); + } + return t; + } + + // Override this to do something other than logging the progress + protected void logProgress() + { + if( log != null ) + { + log.map( counter.count + "" ); + } + } + } + // END SNIPPET: progress +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/apache/zest/io/package.html ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/apache/zest/io/package.html b/core/io/src/main/java/org/apache/zest/io/package.html new file mode 100644 index 0000000..aac8a54 --- /dev/null +++ b/core/io/src/main/java/org/apache/zest/io/package.html @@ -0,0 +1,21 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<html> + <body> + <h2>I/O API.</h2> + </body> +</html> http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/qi4j/io/Files.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Files.java b/core/io/src/main/java/org/qi4j/io/Files.java deleted file mode 100644 index 562d03c..0000000 --- a/core/io/src/main/java/org/qi4j/io/Files.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.qi4j.io; - -import java.io.File; -import java.util.Random; - -/** - * Utility class for files. - */ -public class Files -{ - private static Random random = new Random(); - - public static File createTemporayFileOf( File file ) - { - return new File( file.getAbsolutePath() + "_" + Math.abs( random.nextLong() ) ); - } -} http://git-wip-us.apache.org/repos/asf/zest-java/blob/8744a67f/core/io/src/main/java/org/qi4j/io/Input.java ---------------------------------------------------------------------- diff --git a/core/io/src/main/java/org/qi4j/io/Input.java b/core/io/src/main/java/org/qi4j/io/Input.java deleted file mode 100644 index bde80e5..0000000 --- a/core/io/src/main/java/org/qi4j/io/Input.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2010, Rickard Ãberg. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.qi4j.io; - -/** - * Input source of data. - * <p> - * Invoke transferTo to send data from this input to given output. transferTo can be invoked - * as many times as you want. The transferTo implementation must ensure that any exceptions thrown - * by the Input or the Output which transferred data is sent to is handled properly, i.e. that resources - * are closed. Any client code to transferTo calls should not have to bother with resource management, - * but may catch exceptions anyway for logging and similar purposes. - * </p> - */ -// START SNIPPET: input -public interface Input<T, SenderThrowableType extends Throwable> -{ - <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) - throws SenderThrowableType, ReceiverThrowableType; -} -// END SNIPPET: input
