entitystores: replace usage of core/io with streams
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/ee1d1abc Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/ee1d1abc Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/ee1d1abc Branch: refs/heads/develop Commit: ee1d1abccdd5e978f6a5216f1265958825139925 Parents: 8854b13 Author: Paul Merlin <[email protected]> Authored: Sat Dec 3 12:12:22 2016 +0100 Committer: Paul Merlin <[email protected]> Committed: Fri Dec 9 09:26:40 2016 +0100 ---------------------------------------------------------------------- .../memory/MemoryMapEntityStoreMixin.java | 107 ++---- .../zest/spi/entitystore/BackupRestore.java | 25 +- .../zest/spi/entitystore/EntityStore.java | 10 +- .../helpers/JSONMapEntityStoreMixin.java | 125 +++---- .../spi/entitystore/helpers/MapEntityStore.java | 6 +- .../helpers/MapEntityStoreMixin.java | 112 +++--- .../test/entity/AbstractEntityStoreTest.java | 32 ++ .../entitystore/file/FileEntityStoreMixin.java | 98 ++---- .../geode/GeodeEntityStoreMixin.java | 29 +- .../hazelcast/HazelcastEntityStoreMixin.java | 29 +- .../jclouds/JCloudsMapEntityStoreMixin.java | 73 ++-- .../entitystore/jdbm/JdbmEntityStoreMixin.java | 293 ++++++++-------- .../leveldb/LevelDBEntityStoreMixin.java | 65 ++-- .../mongodb/MongoMapEntityStoreMixin.java | 42 +-- .../prefs/PreferencesEntityStoreMixin.java | 55 +-- .../redis/RedisMapEntityStoreMixin.java | 42 +-- .../riak/RiakMapEntityStoreMixin.java | 82 ++--- .../entitystore/sql/SQLEntityStoreMixin.java | 337 ++++++++++--------- .../apache/zest/migration/MigrationTest.java | 90 ++--- .../reindexer/internal/ReindexerMixin.java | 54 ++- .../zest/library/logging/DebuggingTest.java | 31 +- .../apache/zest/library/sql/common/SQLUtil.java | 78 ++++- 22 files changed, 739 insertions(+), 1076 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java index 893e17a..79ad54d 100644 --- a/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java +++ b/core/spi/src/main/java/org/apache/zest/entitystore/memory/MemoryMapEntityStoreMixin.java @@ -23,18 +23,13 @@ import java.io.IOException; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; +import java.io.UncheckedIOException; import java.io.Writer; import java.util.HashMap; import java.util.Map; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; +import java.util.stream.Stream; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.entitystore.BackupRestore; import org.apache.zest.spi.entitystore.EntityAlreadyExistsException; import org.apache.zest.spi.entitystore.EntityNotFoundException; @@ -42,6 +37,9 @@ import org.apache.zest.spi.entitystore.EntityStoreException; import org.apache.zest.spi.entitystore.helpers.JSONKeys; import org.apache.zest.spi.entitystore.helpers.MapEntityStore; import org.apache.zest.spi.entitystore.helpers.MapEntityStoreActivation; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; /** * In-memory implementation of MapEntityStore. @@ -90,95 +88,34 @@ public class MemoryMapEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException - { - for( String state : store.values() ) - { - receiver.receive( new StringReader( state ) ); - } - } - } ); - } - }; + return store.values().stream().map( StringReader::new ); } @Override - public Input<String, IOException> backup() + public Stream<String> backup() { - return new Input<String, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<String, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException - { - for( String state : store.values() ) - { - receiver.receive( state ); - } - } - } ); - } - }; + return store.values().stream(); } @Override - public Output<String, IOException> restore() + public void restore( final Stream<String> stream ) { - return new Output<String, IOException>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) - throws IOException, SenderThrowableType + store.clear(); + stream.forEach( item -> { + try { - store.clear(); - - try - { - sender.sendTo( new Receiver<String, IOException>() - { - @Override - public void receive( String item ) - throws IOException - { - try - { - JSONTokener tokener = new JSONTokener( item ); - JSONObject entity = (JSONObject) tokener.nextValue(); - String id = entity.getString( JSONKeys.IDENTITY ); - store.put( EntityReference.parseEntityReference( id ), item ); - } - catch( JSONException e ) - { - throw new IOException( e ); - } - } - } ); - } - catch( IOException e ) - { - store.clear(); - throw e; - } + JSONTokener tokener = new JSONTokener( item ); + JSONObject entity = (JSONObject) tokener.nextValue(); + String id = entity.getString( JSONKeys.IDENTITY ); + store.put( EntityReference.parseEntityReference( id ), item ); + } + catch( JSONException e ) + { + throw new UncheckedIOException( new IOException( e ) ); } - }; + } ); } private class MemoryMapChanger http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java ---------------------------------------------------------------------- diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java index e7cb5be..63aa3aa 100644 --- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java +++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/BackupRestore.java @@ -20,9 +20,8 @@ package org.apache.zest.spi.entitystore; -import java.io.IOException; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; +import java.util.function.Consumer; +import java.util.stream.Stream; /** * Allow backups and restores of data in an EntityStore to be made @@ -30,16 +29,22 @@ import org.apache.zest.io.Output; public interface BackupRestore { /** - * Input that allows data from the entity store to be backed up. - * - * @return An Input instance containing the data to back up. + * Backup as a stream of serialized entity states, must be closed. + */ + Stream<String> backup(); + + /** + * Restore from a stream of serialized entity states. */ - Input<String, IOException> backup(); + void restore( Stream<String> states ); /** - * Output that allows data to be restored from a backup. + * Restore from streams of serialized entity states. * - * @return An Output instance to receive the restored data. + * @return A consumer of streams of serialized entity states */ - Output<String, IOException> restore(); + default Consumer<Stream<String>> restore() + { + return this::restore; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java ---------------------------------------------------------------------- diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java index 71752b0..2e002c2 100644 --- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java +++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/EntityStore.java @@ -20,9 +20,9 @@ package org.apache.zest.spi.entitystore; import java.time.Instant; +import java.util.stream.Stream; import org.apache.zest.api.structure.ModuleDescriptor; import org.apache.zest.api.usecase.Usecase; -import org.apache.zest.io.Input; import org.apache.zest.spi.entity.EntityState; /** @@ -32,5 +32,11 @@ public interface EntityStore { EntityStoreUnitOfWork newUnitOfWork( ModuleDescriptor module, Usecase usecase, Instant currentTime ); - Input<EntityState, EntityStoreException> entityStates( ModuleDescriptor module ); + /** + * Stream of all entity states, must be closed. + * + * @param module Module + * @return Stream of all entity states, must be closed + */ + Stream<EntityState> entityStates( ModuleDescriptor module ); } http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java index 90d5acb..ebaa2f2 100644 --- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java +++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/JSONMapEntityStoreMixin.java @@ -29,6 +29,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.stream.Stream; import org.apache.zest.api.cache.CacheOptions; import org.apache.zest.api.common.Optional; import org.apache.zest.api.entity.EntityDescriptor; @@ -47,10 +48,6 @@ import org.apache.zest.api.structure.ModuleDescriptor; import org.apache.zest.api.unitofwork.NoSuchEntityTypeException; import org.apache.zest.api.usecase.Usecase; import org.apache.zest.api.value.ValueSerialization; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.ZestSPI; import org.apache.zest.spi.cache.Cache; import org.apache.zest.spi.cache.CachePool; @@ -310,102 +307,62 @@ public class JSONMapEntityStoreMixin } @Override - public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module ) + public Stream<EntityState> entityStates( ModuleDescriptor module ) { - return new Input<EntityState, EntityStoreException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output ) - throws EntityStoreException, ReceiverThrowableType + List<EntityState> migrated = new ArrayList<>(); + return mapEntityStore.entityStates().map( + reader -> { - output.receiveFrom( new Sender<EntityState, EntityStoreException>() + EntityState entity = readEntityState( module, reader ); + if( entity.status() == EntityStatus.UPDATED ) { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, EntityStoreException + migrated.add( entity ); + // Synch back 100 at a time + if( migrated.size() > 100 ) { - final List<EntityState> migrated = new ArrayList<>(); - try - { - mapEntityStore.entityStates().transferTo( new Output<Reader, ReceiverThrowableType>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender ) - throws ReceiverThrowableType, SenderThrowableType - { - sender.sendTo( new Receiver<Reader, ReceiverThrowableType>() - { - @Override - public void receive( Reader item ) - throws ReceiverThrowableType - { - final EntityState entity = readEntityState( module, item ); - if( entity.status() == EntityStatus.UPDATED ) - { - migrated.add( entity ); - - // Synch back 100 at a time - if( migrated.size() > 100 ) - { - try - { - synchMigratedEntities( migrated ); - } - catch( IOException e ) - { - throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e ); - } - } - } - receiver.receive( entity ); - } - } ); - - // Synch any remaining migrated entities - if( !migrated.isEmpty() ) - { - try - { - synchMigratedEntities( migrated ); - } - catch( IOException e ) - { - throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e ); - } - } - } - } ); - } - catch( IOException e ) - { - throw new EntityStoreException( e ); - } + synchMigratedEntities( migrated ); } - } ); + } + return entity; } - }; + ).onClose( + () -> + { + // Synch any remaining migrated entities + if( !migrated.isEmpty() ) + { + synchMigratedEntities( migrated ); + } + } + ); } private void synchMigratedEntities( final List<EntityState> migratedEntities ) - throws IOException { - mapEntityStore.applyChanges( new MapEntityStore.MapChanges() + try { - @Override - public void visitMap( MapEntityStore.MapChanger changer ) - throws IOException + mapEntityStore.applyChanges( new MapEntityStore.MapChanges() { - for( EntityState migratedEntity : migratedEntities ) + @Override + public void visitMap( MapEntityStore.MapChanger changer ) + throws IOException { - JSONEntityState state = (JSONEntityState) migratedEntity; - try (Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() )) + for( EntityState migratedEntity : migratedEntities ) { - writeEntityState( state, writer, state.version(), state.lastModified() ); + JSONEntityState state = (JSONEntityState) migratedEntity; + try( Writer writer = changer.updateEntity( state.entityReference(), state.entityDescriptor() ) ) + { + writeEntityState( state, writer, state.version(), state.lastModified() ); + } } } - } - } ); - migratedEntities.clear(); + } ); + migratedEntities.clear(); + } + catch( IOException ex ) + { + throw new EntityStoreException( "Synchronization of Migrated Entities failed.", ex ); + } } protected Identity newUnitOfWorkId() http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java ---------------------------------------------------------------------- diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java index 2ec20a0..435494f 100644 --- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java +++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStore.java @@ -22,9 +22,9 @@ package org.apache.zest.spi.entitystore.helpers; import java.io.IOException; import java.io.Reader; import java.io.Writer; +import java.util.stream.Stream; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; -import org.apache.zest.io.Input; import org.apache.zest.spi.entitystore.EntityNotFoundException; import org.apache.zest.spi.entitystore.EntityStoreException; @@ -42,9 +42,9 @@ public interface MapEntityStore throws EntityStoreException; /** - * @return All entities state Readers + * @return All entities state Readers, must be closed */ - Input<Reader, IOException> entityStates(); + Stream<Reader> entityStates(); void applyChanges( MapChanges changes ) throws IOException; http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java index fceae54..8a61ced 100644 --- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java +++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/helpers/MapEntityStoreMixin.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.zest.api.common.Optional; import org.apache.zest.api.common.QualifiedName; import org.apache.zest.api.entity.EntityDescriptor; @@ -46,10 +47,6 @@ import org.apache.zest.api.unitofwork.NoSuchEntityTypeException; import org.apache.zest.api.usecase.Usecase; import org.apache.zest.api.value.ValueSerialization; import org.apache.zest.api.value.ValueSerializationException; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.ZestSPI; import org.apache.zest.spi.entity.EntityState; import org.apache.zest.spi.entity.EntityStatus; @@ -200,74 +197,47 @@ public class MapEntityStoreMixin } @Override - public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module ) + public Stream<EntityState> entityStates( final ModuleDescriptor module ) { - return new Input<EntityState, EntityStoreException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output ) - throws EntityStoreException, ReceiverThrowableType - { - output.receiveFrom( new Sender<EntityState, EntityStoreException>() - { - @Override - public <RecThrowableType extends Throwable> void sendTo( final Receiver<? super EntityState, RecThrowableType> receiver ) - throws RecThrowableType, EntityStoreException - { - final List<EntityState> migrated = new ArrayList<>(); - try - { - mapEntityStore.entityStates().transferTo( new Output<Reader, RecThrowableType>() - { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends Reader, SenderThrowableType> sender ) - throws RecThrowableType, SenderThrowableType - { - sender.sendTo( item -> { - final EntityState entity = readEntityState( module, item ); - if( entity.status() == EntityStatus.UPDATED ) - { - migrated.add( entity ); - - // Synch back 100 at a time - if( migrated.size() > 100 ) - { - try - { - synchMigratedEntities( migrated ); - } - catch( IOException e ) - { - throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e ); - } - } - } - receiver.receive( entity ); - } ); - - // Synch any remaining migrated entities - if( !migrated.isEmpty() ) - { - try - { - synchMigratedEntities( migrated ); - } - catch( IOException e ) - { - throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e ); - } - } - } - } ); - } - catch( IOException e ) - { - throw new EntityStoreException( e ); - } - } - } ); - } - }; + List<EntityState> migrated = new ArrayList<>(); + return mapEntityStore + .entityStates() + .map( reader -> + { + EntityState entity = readEntityState( module, reader ); + if( entity.status() == EntityStatus.UPDATED ) + { + migrated.add( entity ); + // Synch back 100 at a time + if( migrated.size() > 100 ) + { + try + { + synchMigratedEntities( migrated ); + } + catch( IOException e ) + { + throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e ); + } + } + } + return entity; + } ) + .onClose( () -> + { + // Synch any remaining migrated entities + if( !migrated.isEmpty() ) + { + try + { + synchMigratedEntities( migrated ); + } + catch( IOException e ) + { + throw new EntityStoreException( "Synchronization of Migrated Entities failed.", e ); + } + } + } ); } private void synchMigratedEntities( final List<EntityState> migratedEntities ) http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java ---------------------------------------------------------------------- diff --git a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java index ab05f39..74368ae 100644 --- a/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java +++ b/core/testsupport/src/main/java/org/apache/zest/test/entity/AbstractEntityStoreTest.java @@ -31,6 +31,7 @@ import java.time.ZonedDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.zest.api.association.Association; import org.apache.zest.api.association.ManyAssociation; import org.apache.zest.api.association.NamedAssociation; @@ -51,6 +52,7 @@ import org.apache.zest.api.value.ValueBuilder; import org.apache.zest.api.value.ValueComposite; import org.apache.zest.bootstrap.AssemblyException; import org.apache.zest.bootstrap.ModuleAssembly; +import org.apache.zest.spi.entity.EntityState; import org.apache.zest.spi.entitystore.EntityStore; import org.apache.zest.test.AbstractZestTest; import org.junit.After; @@ -501,6 +503,36 @@ public abstract class AbstractEntityStoreTest } } + @Test + public void entityStatesSPI() + { + EntityStore entityStore = serviceFinder.findService( EntityStore.class ).get(); + + try( Stream<EntityState> states = entityStore.entityStates( module ) ) + { + assertThat( states.count(), is( 0L ) ); + } + + UnitOfWork unitOfWork = unitOfWorkFactory.newUnitOfWork(); + TestEntity newInstance = createEntity( unitOfWork ); + unitOfWork.complete(); + + try( Stream<EntityState> states = entityStore.entityStates( module ) ) + { + assertThat( states.count(), is( 1L ) ); + } + + unitOfWork = unitOfWorkFactory.newUnitOfWork(); + TestEntity instance = unitOfWork.get( newInstance ); + unitOfWork.remove( instance ); + unitOfWork.complete(); + + try( Stream<EntityState> states = entityStore.entityStates( module ) ) + { + assertThat( states.count(), is( 0L ) ); + } + } + public interface TestEntity extends EntityComposite { http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java index 1ab1c53..500f313 100644 --- a/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java +++ b/extensions/entitystore-file/src/main/java/org/apache/zest/entitystore/file/FileEntityStoreMixin.java @@ -29,16 +29,13 @@ import java.io.Writer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.stream.Stream; import org.apache.zest.api.common.Optional; import org.apache.zest.api.configuration.Configuration; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; import org.apache.zest.api.injection.scope.Service; import org.apache.zest.api.injection.scope.This; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.library.fileconfig.FileConfiguration; import org.apache.zest.spi.entitystore.BackupRestore; import org.apache.zest.spi.entitystore.EntityAlreadyExistsException; @@ -231,84 +228,43 @@ public class FileEntityStoreMixin } @Override - public Input<String, IOException> backup() + public Stream<Reader> entityStates() { - return new Input<String, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<String, IOException>() - { - @Override - public <ThrowableType extends Throwable> void sendTo( Receiver<? super String, ThrowableType> receiver ) - throws ThrowableType, IOException - { - for( File sliceDirectory : dataDirectory.listFiles() ) - { - for( File file : sliceDirectory.listFiles() ) - { - receiver.receive( fetch( file ) ); - } - } - } - } ); - } - }; + return backup().map( StringReader::new ); } @Override - public Output<String, IOException> restore() + public Stream<String> backup() { - return new Output<String, IOException>() + if( !dataDirectory.exists() ) { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) - throws IOException, SenderThrowableType - { - sender.sendTo( new Receiver<String, IOException>() - { - @Override - public void receive( String item ) - throws IOException - { - String id = item.substring( "{\"reference\":\"".length() ); - id = id.substring( 0, id.indexOf( '"' ) ); - store( getDataFile( id ), item ); - } - } ); - } - }; + return Stream.of(); + } + try + { + return java.nio.file.Files.walk( dataDirectory.toPath(), 3 ) + .skip( 1 ) + .filter( path -> !"slices".equals( path.getFileName().toString() ) ) + .map( Path::toFile ) + .filter( file -> !file.isDirectory() ) + .map( this::uncheckedFetch ); + } + catch( IOException ex ) + { + throw new EntityStoreException( ex ); + } } @Override - public Input<Reader, IOException> entityStates() + public void restore( final Stream<String> stream ) { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType + stream.forEach( + item -> { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <ThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ThrowableType> receiver ) - throws ThrowableType, IOException - { - for( File sliceDirectory : dataDirectory.listFiles() ) - { - for( File file : sliceDirectory.listFiles() ) - { - String state = fetch( file ); - receiver.receive( new StringReader( state ) ); - } - } - } - } ); - } - }; + String id = item.substring( "{\"reference\":\"".length() ); + id = id.substring( 0, id.indexOf( '"' ) ); + uncheckedStore( getDataFile( id ), item ); + } ); } private File getDataFile( String identity ) http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java index fca8a6b..25a58b7 100644 --- a/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java +++ b/extensions/entitystore-geode/src/main/java/org/apache/zest/entitystore/geode/GeodeEntityStoreMixin.java @@ -23,8 +23,8 @@ import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; -import java.util.Map; import java.util.Properties; +import java.util.stream.Stream; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; @@ -39,10 +39,6 @@ import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.service.ServiceActivation; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.entitystore.EntityNotFoundException; import org.apache.zest.spi.entitystore.EntityStoreException; import org.apache.zest.spi.entitystore.helpers.MapEntityStore; @@ -197,27 +193,8 @@ public class GeodeEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver ) - throws RTT, IOException - { - for( Map.Entry<String, String> eachEntry : region.entrySet() ) - { - receiver.receive( new StringReader( eachEntry.getValue() ) ); - } - } - } ); - } - }; + return region.values().stream().map( StringReader::new ); } } http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java index f588862..34386f2 100644 --- a/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java +++ b/extensions/entitystore-hazelcast/src/main/java/org/apache/zest/entitystore/hazelcast/HazelcastEntityStoreMixin.java @@ -30,16 +30,12 @@ import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; -import java.util.Map; +import java.util.stream.Stream; import org.apache.zest.api.configuration.Configuration; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.service.ServiceActivation; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.entitystore.EntityNotFoundException; import org.apache.zest.spi.entitystore.EntityStoreException; import org.apache.zest.spi.entitystore.helpers.MapEntityStore; @@ -147,28 +143,9 @@ public class HazelcastEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <RTT extends Throwable> void sendTo( Receiver<? super Reader, RTT> receiver ) - throws RTT, IOException - { - for( Map.Entry<String, String> eachEntry : stringMap.entrySet() ) - { - receiver.receive( new StringReader( eachEntry.getValue() ) ); - } - } - } ); - } - }; + return stringMap.values().stream().map( StringReader::new ); } private Config createConfig( HazelcastConfiguration configuration ) http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java index f8c2ac1..b895177 100644 --- a/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java +++ b/extensions/entitystore-jclouds/src/main/java/org/apache/zest/entitystore/jclouds/JCloudsMapEntityStoreMixin.java @@ -26,7 +26,6 @@ import com.google.common.collect.Maps; import com.google.common.io.ByteSource; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; @@ -36,15 +35,12 @@ import java.util.Map; import java.util.Properties; import java.util.Scanner; import java.util.Set; +import java.util.stream.Stream; import org.apache.zest.api.configuration.Configuration; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.service.ServiceActivation; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.entitystore.EntityNotFoundException; import org.apache.zest.spi.entitystore.EntityStoreException; import org.apache.zest.spi.entitystore.helpers.MapEntityStore; @@ -54,7 +50,6 @@ import org.jclouds.apis.Apis; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.domain.StorageMetadata; import org.jclouds.io.Payload; import org.jclouds.providers.ProviderMetadata; import org.jclouds.providers.Providers; @@ -254,52 +249,26 @@ public class JCloudsMapEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( - new Sender<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException - { - for( StorageMetadata stored : storeContext.getBlobStore().list() ) - { - Payload payload = storeContext.getBlobStore().getBlob( container, stored.getName() ).getPayload(); - if( payload == null ) - { - throw new EntityNotFoundException( EntityReference.parseEntityReference( stored.getName() ) ); - } - InputStream input = null; - try - { - input = payload.openStream(); - receiver.receive( new InputStreamReader( input, "UTF-8" ) ); - } - finally - { - if( input != null ) - { - try - { - input.close(); - } - catch( IOException ignored ) - { - } - } - } - } - } - } - ); - } - }; + return storeContext + .getBlobStore().list( container ).stream() + .map( metadata -> + { + Payload payload = storeContext.getBlobStore().getBlob( container, metadata.getName() ).getPayload(); + if( payload == null ) + { + throw new EntityNotFoundException( EntityReference.parseEntityReference( metadata.getName() ) ); + } + try( InputStream input = payload.openStream() ) + { + String state = new Scanner( input, UTF_8.name() ).useDelimiter( "\\Z" ).next(); + return (Reader) new StringReader( state ); + } + catch( IOException ex ) + { + throw new EntityStoreException( ex ); + } + } ); } } http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java index fe3a9cf..19c2b7d 100644 --- a/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java +++ b/extensions/entitystore-jdbm/src/main/java/org/apache/zest/entitystore/jdbm/JdbmEntityStoreMixin.java @@ -24,9 +24,16 @@ import java.io.IOException; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; +import java.io.UncheckedIOException; import java.io.Writer; import java.util.Properties; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import jdbm.RecordManager; import jdbm.RecordManagerFactory; import jdbm.RecordManagerOptions; @@ -48,10 +55,6 @@ import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.injection.scope.Uses; import org.apache.zest.api.service.ServiceDescriptor; import org.apache.zest.io.Files; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.library.fileconfig.FileConfiguration; import org.apache.zest.library.locking.ReadLock; import org.apache.zest.library.locking.WriteLock; @@ -213,198 +216,166 @@ public class JdbmEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - lock.writeLock().lock(); - - try - { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException - { - final TupleBrowser browser = index.browse(); - final Tuple tuple = new Tuple(); - - while( browser.getNext( tuple ) ) - { - Identity id = new StringIdentity( (byte[]) tuple.getKey() ); - - Long stateIndex = getStateIndex( id ); - - if( stateIndex == null ) - { - continue; - } // Skip this one - - byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer ); - - receiver.receive( new StringReader( new String( serializedState, "UTF-8" ) ) ); - } - } - } ); - } - finally - { - lock.writeLock().unlock(); - } - } - }; + return backup().map( StringReader::new ); } @Override - public Input<String, IOException> backup() + public Stream<String> backup() { - return new Input<String, IOException>() + lock.writeLock().lock(); + TupleBrowser browser; + try { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType + browser = index.browse(); + } + catch( IOException ex ) + { + lock.writeLock().unlock(); + throw new EntityStoreException( ex ); + } + return StreamSupport.stream( + new Spliterators.AbstractSpliterator<String>( Long.MAX_VALUE, Spliterator.ORDERED ) { - lock.readLock().lock(); + private final Tuple tuple = new Tuple(); - try + @Override + public boolean tryAdvance( final Consumer<? super String> action ) { - output.receiveFrom( new Sender<String, IOException>() + try { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException + if( !browser.getNext( tuple ) ) { - final TupleBrowser browser = index.browse(); - final Tuple tuple = new Tuple(); - - while( browser.getNext( tuple ) ) - { - String id = new String( (byte[]) tuple.getKey(), "UTF-8" ); - - Long stateIndex = getStateIndex( new StringIdentity( id ) ); - - if( stateIndex == null ) - { - continue; - } // Skip this one - - byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer ); - - receiver.receive( new String( serializedState, "UTF-8" ) ); - } + return false; } - } ); - } - finally - { - lock.readLock().unlock(); + Identity identity = new StringIdentity( (byte[]) tuple.getKey() ); + Long stateIndex = getStateIndex( identity ); + if( stateIndex == null ) + { + return false; + } + byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer ); + String state = new String( serializedState, "UTF-8" ); + action.accept( state ); + return true; + } + catch( IOException ex ) + { + lock.writeLock().unlock(); + throw new EntityStoreException( ex ); + } } - } - }; + }, + false + ).onClose( () -> lock.writeLock().unlock() ); } @Override - public Output<String, IOException> restore() + public void restore( final Stream<String> states ) { - return new Output<String, IOException>() + File dbFile = new File( getDatabaseName() + ".db" ); + File lgFile = new File( getDatabaseName() + ".lg" ); + + // Create temporary store + File tempDatabase = Files.createTemporayFileOf( dbFile ); + final RecordManager recordManager; + final BTree index; + try + { + recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(), + new Properties() ); + ByteArrayComparator comparator = new ByteArrayComparator(); + index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 ); + recordManager.setNamedObject( "index", index.getRecid() ); + recordManager.commit(); + } + catch( IOException ex ) + { + throw new EntityStoreException( ex ); + } + try { - @Override - public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender ) - throws IOException, SenderThrowableType + // TODO NO NEED TO SYNCHRONIZE HERE + AtomicLong counter = new AtomicLong(); + Consumer<String> stateConsumer = state -> { - File dbFile = new File( getDatabaseName() + ".db" ); - File lgFile = new File( getDatabaseName() + ".lg" ); - - // Create temporary store - File tempDatabase = Files.createTemporayFileOf( dbFile ); - - final RecordManager recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(), new Properties() ); - ByteArrayComparator comparator = new ByteArrayComparator(); - final BTree index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 ); - recordManager.setNamedObject( "index", index.getRecid() ); - recordManager.commit(); - try { - sender.sendTo( new Receiver<String, IOException>() + // Commit one batch + if( ( counter.incrementAndGet() % 1000 ) == 0 ) { - int counter = 0; - - @Override - public void receive( String item ) - throws IOException - { - // Commit one batch - if( ( counter++ % 1000 ) == 0 ) - { - recordManager.commit(); - } + recordManager.commit(); + } - String id = item.substring( "{\"reference\":\"".length() ); - id = id.substring( 0, id.indexOf( '"' ) ); + String id = state.substring( "{\"reference\":\"".length() ); + id = id.substring( 0, id.indexOf( '"' ) ); - // Insert - byte[] stateArray = item.getBytes( "UTF-8" ); - long stateIndex = recordManager.insert( stateArray, serializer ); - index.insert( id.getBytes( "UTF-8" ), stateIndex, false ); - } - } ); + // Insert + byte[] stateArray = state.getBytes( "UTF-8" ); + long stateIndex = recordManager.insert( stateArray, serializer ); + index.insert( id.getBytes( "UTF-8" ), stateIndex, false ); } - catch( IOException e ) + catch( IOException ex ) { - recordManager.close(); - tempDatabase.delete(); - throw e; + throw new UncheckedIOException( ex ); } - catch( Throwable senderThrowableType ) - { - recordManager.close(); - tempDatabase.delete(); - throw (SenderThrowableType) senderThrowableType; - } - - // Import went ok - continue - recordManager.commit(); - // close file handles otherwise Microsoft Windows will fail to rename database files. + }; + states.forEach( stateConsumer ); + // Import went ok - continue + recordManager.commit(); + // close file handles otherwise Microsoft Windows will fail to rename database files. + recordManager.close(); + } + catch( IOException | UncheckedIOException ex ) + { + try + { recordManager.close(); + } + catch( IOException ignore ) { } + tempDatabase.delete(); + throw new EntityStoreException( ex ); + } + try + { - lock.writeLock().lock(); - try - { - // Replace old database with new - JdbmEntityStoreMixin.this.recordManager.close(); - - boolean deletedOldDatabase = true; - deletedOldDatabase &= dbFile.delete(); - deletedOldDatabase &= lgFile.delete(); - if( !deletedOldDatabase ) - { - throw new IOException( "Could not remove old database" ); - } + lock.writeLock().lock(); + try + { + // Replace old database with new + JdbmEntityStoreMixin.this.recordManager.close(); - boolean renamedTempDatabase = true; - renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile ); - renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile ); + boolean deletedOldDatabase = true; + deletedOldDatabase &= dbFile.delete(); + deletedOldDatabase &= lgFile.delete(); + if( !deletedOldDatabase ) + { + throw new EntityStoreException( "Could not remove old database" ); + } - if( !renamedTempDatabase ) - { - throw new IOException( "Could not replace database with temp database" ); - } + boolean renamedTempDatabase = true; + renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile ); + renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile ); - // Start up again - initialize(); - } - finally + if( !renamedTempDatabase ) { - lock.writeLock().unlock(); + throw new EntityStoreException( "Could not replace database with temp database" ); } + + // Start up again + initialize(); } - }; + finally + { + lock.writeLock().unlock(); + } + } + catch( IOException ex ) + { + tempDatabase.delete(); + throw new EntityStoreException( ex ); + } } private String getDatabaseName() http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java index 638b021..7e485b2 100644 --- a/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java +++ b/extensions/entitystore-leveldb/src/main/java/org/apache/zest/entitystore/leveldb/LevelDBEntityStoreMixin.java @@ -26,6 +26,11 @@ import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; import java.nio.charset.Charset; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.zest.api.configuration.Configuration; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; @@ -34,10 +39,6 @@ import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.injection.scope.Uses; import org.apache.zest.api.service.ServiceActivation; import org.apache.zest.api.service.ServiceDescriptor; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.library.fileconfig.FileConfiguration; import org.apache.zest.spi.entitystore.EntityNotFoundException; import org.apache.zest.spi.entitystore.EntityStoreException; @@ -197,42 +198,38 @@ public class LevelDBEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType + DBIterator iterator = db.iterator(); + iterator.seekToFirst(); + return StreamSupport.stream( + new Spliterators.AbstractSpliterator<Reader>( Long.MAX_VALUE, Spliterator.ORDERED ) { - output.receiveFrom( new Sender<Reader, IOException>() + @Override + public boolean tryAdvance( final Consumer<? super Reader> action ) { - - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException + if( !iterator.hasNext() ) { - DBIterator iterator = db.iterator(); - try - { - for( iterator.seekToFirst(); iterator.hasNext(); iterator.next() ) - { - byte[] state = iterator.peekNext().getValue(); - String jsonState = new String( state, charset ); - receiver.receive( new StringReader( jsonState ) ); - } - } - finally - { - iterator.close(); - } + return false; } - - } ); + action.accept( new StringReader( new String( iterator.next().getValue(), charset ) ) ); + return true; + } + }, + false + ).onClose( + () -> + { + try + { + iterator.close(); + } + catch( IOException ex ) + { + throw new EntityStoreException( "Unable to close DB iterator" ); + } } - - }; + ); } @Override http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java index 9b988f5..ee97171 100644 --- a/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java +++ b/extensions/entitystore-mongodb/src/main/java/org/apache/zest/entitystore/mongodb/MongoMapEntityStoreMixin.java @@ -25,7 +25,6 @@ import com.mongodb.MongoClientOptions; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.WriteConcern; -import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; @@ -40,15 +39,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.zest.api.configuration.Configuration; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.service.ServiceActivation; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.entitystore.EntityNotFoundException; import org.apache.zest.spi.entitystore.EntityStoreException; import org.apache.zest.spi.entitystore.helpers.MapEntityStore; @@ -289,33 +286,16 @@ public class MongoMapEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( - Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( - Receiver<? super Reader, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException - { - FindIterable<Document> cursor = db.getCollection( collectionName ).find(); - for( Document eachEntity : cursor ) - { - Document bsonState = (Document) eachEntity.get( STATE_COLUMN ); - String jsonState = JSON.serialize( bsonState ); - receiver.receive( new StringReader( jsonState ) ); - } - } - } ); - } - }; + return StreamSupport + .stream( db.getCollection( collectionName ).find().spliterator(), false ) + .map( eachEntity -> + { + Document bsonState = (Document) eachEntity.get( STATE_COLUMN ); + String jsonState = JSON.serialize( bsonState ); + return new StringReader( jsonState ); + } ); } private Bson byIdentity( EntityReference entityReference ) http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java index f76f8d3..09918a2 100644 --- a/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java +++ b/extensions/entitystore-preferences/src/main/java/org/apache/zest/entitystore/prefs/PreferencesEntityStoreMixin.java @@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.prefs.BackingStoreException; import java.util.prefs.Preferences; +import java.util.stream.Stream; import org.apache.zest.api.cache.CacheOptions; import org.apache.zest.api.common.QualifiedName; import org.apache.zest.api.entity.EntityDescriptor; @@ -57,10 +58,6 @@ import org.apache.zest.api.usecase.Usecase; import org.apache.zest.api.usecase.UsecaseBuilder; import org.apache.zest.api.value.ValueSerialization; import org.apache.zest.api.value.ValueSerializationException; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.ZestSPI; import org.apache.zest.spi.entity.EntityState; import org.apache.zest.spi.entity.EntityStatus; @@ -175,43 +172,23 @@ public class PreferencesEntityStoreMixin } @Override - public Input<EntityState, EntityStoreException> entityStates( final ModuleDescriptor module ) + public Stream<EntityState> entityStates( final ModuleDescriptor module ) { - return new Input<EntityState, EntityStoreException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super EntityState, ReceiverThrowableType> output ) - throws EntityStoreException, ReceiverThrowableType - { - output.receiveFrom( new Sender<EntityState, EntityStoreException>() - { - @Override - public <RecThrowableType extends Throwable> void sendTo( Receiver<? super EntityState, RecThrowableType> receiver ) - throws RecThrowableType, EntityStoreException - { - UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" ); - Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase(); - final EntityStoreUnitOfWork uow = - newUnitOfWork( module, visitUsecase, SystemTime.now() ); + UsecaseBuilder builder = UsecaseBuilder.buildUsecase( "zest.entitystore.preferences.visit" ); + Usecase visitUsecase = builder.withMetaInfo( CacheOptions.NEVER ).newUsecase(); + EntityStoreUnitOfWork uow = newUnitOfWork( module, visitUsecase, SystemTime.now() ); - try - { - String[] identities = root.childrenNames(); - for( String identity : identities ) - { - EntityReference reference = EntityReference.parseEntityReference( identity ); - EntityState entityState = uow.entityStateOf( module, reference ); - receiver.receive( entityState ); - } - } - catch( BackingStoreException e ) - { - throw new EntityStoreException( e ); - } - } - } ); - } - }; + try + { + return Stream.of( root.childrenNames() ) + .map( EntityReference::parseEntityReference ) + .map( ref -> uow.entityStateOf( module, ref ) ) + .onClose( uow::discard ); + } + catch( BackingStoreException e ) + { + throw new EntityStoreException( e ); + } } @Override http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java index 4113e39..9bc2446 100644 --- a/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java +++ b/extensions/entitystore-redis/src/main/java/org/apache/zest/entitystore/redis/RedisMapEntityStoreMixin.java @@ -24,16 +24,12 @@ import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; -import java.util.Set; +import java.util.stream.Stream; import org.apache.zest.api.configuration.Configuration; import org.apache.zest.api.entity.EntityDescriptor; import org.apache.zest.api.entity.EntityReference; import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.service.ServiceActivation; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; import org.apache.zest.spi.entitystore.EntityAlreadyExistsException; import org.apache.zest.spi.entitystore.EntityNotFoundException; import org.apache.zest.spi.entitystore.EntityStoreException; @@ -164,38 +160,12 @@ public class RedisMapEntityStoreMixin } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException - { - Jedis jedis = pool.getResource(); - try - { - Set<String> keys = jedis.keys( "*" ); - for( String key : keys ) - { - String jsonState = jedis.get( key ); - receiver.receive( new StringReader( jsonState ) ); - } - } - finally - { - pool.returnResource( jedis ); - } - } - } ); - } - }; + Jedis jedis = pool.getResource(); + return jedis.keys( "*" ).stream() + .map( key -> (Reader) new StringReader( jedis.get( key ) ) ) + .onClose( jedis::close ); } private static boolean notFound( String jsonState ) http://git-wip-us.apache.org/repos/asf/zest-java/blob/ee1d1abc/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java ---------------------------------------------------------------------- diff --git a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java index 0160dfa..fcda8fa 100644 --- a/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java +++ b/extensions/entitystore-riak/src/main/java/org/apache/zest/entitystore/riak/RiakMapEntityStoreMixin.java @@ -28,20 +28,6 @@ import com.basho.riak.client.core.RiakNode; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; import com.basho.riak.client.core.util.HostAndPort; -import org.apache.zest.api.common.InvalidApplicationException; -import org.apache.zest.api.configuration.Configuration; -import org.apache.zest.api.entity.EntityDescriptor; -import org.apache.zest.api.entity.EntityReference; -import org.apache.zest.api.injection.scope.This; -import org.apache.zest.api.service.ServiceActivation; -import org.apache.zest.io.Input; -import org.apache.zest.io.Output; -import org.apache.zest.io.Receiver; -import org.apache.zest.io.Sender; -import org.apache.zest.spi.entitystore.EntityNotFoundException; -import org.apache.zest.spi.entitystore.EntityStoreException; -import org.apache.zest.spi.entitystore.helpers.MapEntityStore; - import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -57,6 +43,17 @@ import java.security.Security; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.zest.api.common.InvalidApplicationException; +import org.apache.zest.api.configuration.Configuration; +import org.apache.zest.api.entity.EntityDescriptor; +import org.apache.zest.api.entity.EntityReference; +import org.apache.zest.api.injection.scope.This; +import org.apache.zest.api.service.ServiceActivation; +import org.apache.zest.spi.entitystore.EntityNotFoundException; +import org.apache.zest.spi.entitystore.EntityStoreException; +import org.apache.zest.spi.entitystore.helpers.MapEntityStore; /** * Riak Protobuf implementation of MapEntityStore. @@ -334,40 +331,33 @@ public class RiakMapEntityStoreMixin implements ServiceActivation, MapEntityStor } @Override - public Input<Reader, IOException> entityStates() + public Stream<Reader> entityStates() { - return new Input<Reader, IOException>() + try { - @Override - public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output ) - throws IOException, ReceiverThrowableType - { - output.receiveFrom( new Sender<Reader, IOException>() - { - @Override - public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver ) - throws ReceiverThrowableType, IOException - { - try - { - ListKeys listKeys = new ListKeys.Builder( namespace ).build(); - ListKeys.Response listKeysResponse = riakClient.execute( listKeys ); - for( Location location : listKeysResponse ) - { - FetchValue fetch = new FetchValue.Builder( location ).build(); - FetchValue.Response response = riakClient.execute( fetch ); - String jsonState = response.getValue( String.class ); - receiver.receive( new StringReader( jsonState ) ); - } - } - catch( InterruptedException | ExecutionException ex ) - { - throw new EntityStoreException( "Unable to apply entity changes.", ex ); - } - } - } ); - } - }; + ListKeys listKeys = new ListKeys.Builder( namespace ).build(); + ListKeys.Response listKeysResponse = riakClient.execute( listKeys ); + return StreamSupport + .stream( listKeysResponse.spliterator(), false ) + .map( location -> + { + try + { + FetchValue fetch = new FetchValue.Builder( location ).build(); + FetchValue.Response response = riakClient.execute( fetch ); + String jsonState = response.getValue( String.class ); + return new StringReader( jsonState ); + } + catch( InterruptedException | ExecutionException ex ) + { + throw new EntityStoreException( "Unable to get entity states.", ex ); + } + } ); + } + catch( InterruptedException | ExecutionException ex ) + { + throw new EntityStoreException( "Unable to get entity states.", ex ); + } }
