eventsourcing: added assembly; bad concern fix; application events test
Project: http://git-wip-us.apache.org/repos/asf/zest-qi4j/repo Commit: http://git-wip-us.apache.org/repos/asf/zest-qi4j/commit/ae8b4700 Tree: http://git-wip-us.apache.org/repos/asf/zest-qi4j/tree/ae8b4700 Diff: http://git-wip-us.apache.org/repos/asf/zest-qi4j/diff/ae8b4700 Branch: refs/heads/develop Commit: ae8b4700bec8960def18a45864ad75e3b8ff434f Parents: 2daf2be Author: tbml <[email protected]> Authored: Sun Jul 19 06:38:37 2015 +0200 Committer: tbml <[email protected]> Committed: Sun Jul 19 06:38:37 2015 +0200 ---------------------------------------------------------------------- .../factory/ApplicationEventFactoryService.java | 6 +- .../MemoryApplicationEventStoreService.java | 124 ++++++++++++ .../bootstrap/EventsourcingAssembler.java | 60 ++++++ .../application/ApplicationEventTest.java | 188 +++++++++++++++++++ .../eventsourcing/domain/DomainEventTest.java | 18 +- 5 files changed, 389 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zest-qi4j/blob/ae8b4700/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/factory/ApplicationEventFactoryService.java ---------------------------------------------------------------------- diff --git a/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/factory/ApplicationEventFactoryService.java b/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/factory/ApplicationEventFactoryService.java index 675f6ba..e5c4e4f 100644 --- a/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/factory/ApplicationEventFactoryService.java +++ b/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/factory/ApplicationEventFactoryService.java @@ -16,7 +16,6 @@ package org.qi4j.library.eventsourcing.application.factory; -import java.util.Date; import org.json.JSONException; import org.json.JSONObject; import org.json.JSONStringer; @@ -33,13 +32,14 @@ import org.qi4j.api.unitofwork.UnitOfWorkFactory; import org.qi4j.api.value.ValueBuilder; import org.qi4j.api.value.ValueBuilderFactory; import org.qi4j.library.eventsourcing.application.api.ApplicationEvent; -import org.qi4j.library.eventsourcing.domain.factory.UnitOfWorkNotificationConcern; import org.qi4j.library.eventsourcing.domain.spi.CurrentUser; +import java.util.Date; + /** * DomainEventValue factory */ -@Concerns(UnitOfWorkNotificationConcern.class) +@Concerns(TransactionNotificationConcern.class) @Mixins(ApplicationEventFactoryService.Mixin.class) public interface ApplicationEventFactoryService extends ApplicationEventFactory, ServiceComposite http://git-wip-us.apache.org/repos/asf/zest-qi4j/blob/ae8b4700/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/source/memory/MemoryApplicationEventStoreService.java ---------------------------------------------------------------------- diff --git a/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/source/memory/MemoryApplicationEventStoreService.java b/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/source/memory/MemoryApplicationEventStoreService.java new file mode 100644 index 0000000..df24e7a --- /dev/null +++ b/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/application/source/memory/MemoryApplicationEventStoreService.java @@ -0,0 +1,124 @@ +package org.qi4j.library.eventsourcing.application.source.memory; + +import org.qi4j.api.activation.Activators; +import org.qi4j.api.mixin.Mixins; +import org.qi4j.api.service.ServiceComposite; +import org.qi4j.io.Input; +import org.qi4j.io.Output; +import org.qi4j.io.Receiver; +import org.qi4j.io.Sender; +import org.qi4j.library.eventsourcing.application.api.TransactionApplicationEvents; +import org.qi4j.library.eventsourcing.application.source.*; +import org.qi4j.library.eventsourcing.domain.api.UnitOfWorkDomainEventsValue; + +import java.io.IOException; +import java.util.*; + + +/** + * In-Memory ApplicationEventStore. Mainly used for testing. + */ +@Mixins(MemoryApplicationEventStoreService.MemoryStoreMixin.class) +@Activators( ApplicationEventStoreActivation.Activator.class ) +public interface MemoryApplicationEventStoreService + extends ApplicationEventSource, ApplicationEventStore, ApplicationEventStream, ApplicationEventStoreActivation, ServiceComposite { + + abstract class MemoryStoreMixin + extends AbstractApplicationEventStoreMixin + implements ApplicationEventSource, ApplicationEventStoreActivation + { + + // This list holds all transactions + private LinkedList<TransactionApplicationEvents> store = new LinkedList<TransactionApplicationEvents>(); + + @Override + public Input<TransactionApplicationEvents, IOException> transactionsAfter(final long afterTimestamp, final long maxTransactions) { + return new Input<TransactionApplicationEvents, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super TransactionApplicationEvents, ReceiverThrowableType> output ) throws IOException, ReceiverThrowableType + { + // Lock store first + lock.lock(); + try + { + output.receiveFrom( new Sender<TransactionApplicationEvents, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super TransactionApplicationEvents, ReceiverThrowableType> receiver ) throws ReceiverThrowableType, IOException + { + Iterator<TransactionApplicationEvents> iterator = store.iterator(); + + long count = 0; + + while( iterator.hasNext() && count < maxTransactions ) + { + TransactionApplicationEvents next = iterator.next(); + if( next.timestamp().get() > afterTimestamp) { + receiver.receive(next); + count++; + } + } + } + } ); + } finally + { + lock.unlock(); + } + } + }; + + } + + @Override + public Input<TransactionApplicationEvents, IOException> transactionsBefore(final long beforeTimestamp, final long maxTransactions) { + return new Input<TransactionApplicationEvents, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super TransactionApplicationEvents, ReceiverThrowableType> output ) throws IOException, ReceiverThrowableType + { + // Lock store first + lock.lock(); + try + { + output.receiveFrom( new Sender<TransactionApplicationEvents, IOException>() + { + @Override + public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super TransactionApplicationEvents, ReceiverThrowableType> receiver ) throws ReceiverThrowableType, IOException { + + ListIterator<TransactionApplicationEvents> iterator = store.listIterator(); + + while (iterator.hasNext() ){ + TransactionApplicationEvents next = iterator.next(); + if( next.timestamp().get() >= beforeTimestamp) { + break; + } + } + + long count = 0; + + while( iterator.hasPrevious() && count < maxTransactions ) + { + TransactionApplicationEvents next = iterator.previous(); + receiver.receive(next); + count++; + } + } + } ); + } finally + { + lock.unlock(); + } + } + }; + + } + + @Override + protected void storeEvents(TransactionApplicationEvents transactionDomain) throws IOException { + store.add(transactionDomain); + } + + } + +} http://git-wip-us.apache.org/repos/asf/zest-qi4j/blob/ae8b4700/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/bootstrap/EventsourcingAssembler.java ---------------------------------------------------------------------- diff --git a/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/bootstrap/EventsourcingAssembler.java b/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/bootstrap/EventsourcingAssembler.java new file mode 100644 index 0000000..4d76f9c --- /dev/null +++ b/libraries/eventsourcing/src/main/java/org/qi4j/library/eventsourcing/bootstrap/EventsourcingAssembler.java @@ -0,0 +1,60 @@ +package org.qi4j.library.eventsourcing.bootstrap; + +import org.qi4j.bootstrap.Assemblers; +import org.qi4j.bootstrap.AssemblyException; +import org.qi4j.bootstrap.ImportedServiceDeclaration; +import org.qi4j.bootstrap.ModuleAssembly; +import org.qi4j.library.eventsourcing.application.api.ApplicationEvent; +import org.qi4j.library.eventsourcing.application.api.TransactionApplicationEvents; +import org.qi4j.library.eventsourcing.application.factory.ApplicationEventFactoryService; +import org.qi4j.library.eventsourcing.domain.api.DomainEventValue; +import org.qi4j.library.eventsourcing.domain.api.UnitOfWorkDomainEventsValue; +import org.qi4j.library.eventsourcing.domain.factory.CurrentUserUoWPrincipal; +import org.qi4j.library.eventsourcing.domain.factory.DomainEventFactoryService; + + +public class EventsourcingAssembler + extends Assemblers.Visibility<EventsourcingAssembler> { + + + private boolean domainEvents; + private boolean applicationEvents; + + private boolean uowPrincipal; + + public EventsourcingAssembler withDomainEvents() { + domainEvents = true; + return this; + } + + public EventsourcingAssembler withApplicationEvents() { + applicationEvents = true; + return this; + } + + public EventsourcingAssembler withCurrentUserFromUOWPrincipal() { + uowPrincipal = true; + return this; + } + + + @Override + public void assemble(ModuleAssembly module) throws AssemblyException { + + if (domainEvents) { + module.values(DomainEventValue.class, UnitOfWorkDomainEventsValue.class); + module.services(DomainEventFactoryService.class).visibleIn(visibility()); + } + + if (applicationEvents) { + module.values(ApplicationEvent.class, TransactionApplicationEvents.class); + module.services(ApplicationEventFactoryService.class).visibleIn(visibility()); + } + + if (uowPrincipal) { + module.importedServices(CurrentUserUoWPrincipal.class).importedBy(ImportedServiceDeclaration.NEW_OBJECT); + module.objects(CurrentUserUoWPrincipal.class); + } + + } +} http://git-wip-us.apache.org/repos/asf/zest-qi4j/blob/ae8b4700/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/application/ApplicationEventTest.java ---------------------------------------------------------------------- diff --git a/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/application/ApplicationEventTest.java b/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/application/ApplicationEventTest.java new file mode 100644 index 0000000..5aa36dc --- /dev/null +++ b/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/application/ApplicationEventTest.java @@ -0,0 +1,188 @@ +package org.qi4j.library.eventsourcing.application; + +import org.junit.Test; +import org.qi4j.api.common.Optional; +import org.qi4j.api.common.UseDefaults; +import org.qi4j.api.composite.TransientComposite; +import org.qi4j.api.entity.EntityBuilder; +import org.qi4j.api.entity.EntityComposite; +import org.qi4j.api.injection.scope.Service; +import org.qi4j.api.injection.scope.Structure; +import org.qi4j.api.mixin.Mixins; +import org.qi4j.api.property.Property; +import org.qi4j.api.unitofwork.UnitOfWork; +import org.qi4j.api.unitofwork.UnitOfWorkCompletionException; +import org.qi4j.api.unitofwork.UnitOfWorkFactory; +import org.qi4j.api.usecase.UsecaseBuilder; +import org.qi4j.bootstrap.AssemblyException; +import org.qi4j.bootstrap.ModuleAssembly; +import org.qi4j.io.Output; +import org.qi4j.io.Receiver; +import org.qi4j.io.Sender; +import org.qi4j.library.eventsourcing.application.api.ApplicationEvent; +import org.qi4j.library.eventsourcing.application.api.TransactionApplicationEvents; +import org.qi4j.library.eventsourcing.application.factory.ApplicationEventCreationConcern; +import org.qi4j.library.eventsourcing.application.source.ApplicationEventSource; +import org.qi4j.library.eventsourcing.application.source.helper.ApplicationEventParameters; +import org.qi4j.library.eventsourcing.application.source.memory.MemoryApplicationEventStoreService; +import org.qi4j.library.eventsourcing.bootstrap.EventsourcingAssembler; +import org.qi4j.test.AbstractQi4jTest; +import org.qi4j.test.EntityTestAssembler; + +import java.io.IOException; +import java.security.Principal; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * User signup usecase with optional mailing list subscription. + * Subscription is not stored in domain model but is available via application events feed. + */ +public class ApplicationEventTest extends AbstractQi4jTest { + + @Service + ApplicationEventSource eventSource; + + + @Override + public void assemble(ModuleAssembly module) throws AssemblyException { + + // START SNIPPET: assemblyAE + new EventsourcingAssembler() + .withApplicationEvents() + .withCurrentUserFromUOWPrincipal() + .assemble(module); + // END SNIPPET: assemblyAE + + // START SNIPPET: storeAE + module.services(MemoryApplicationEventStoreService.class); + // END SNIPPET: storeAE + + new EntityTestAssembler().assemble(module); + + // START SNIPPET: concernAE + module.transients(Users.class).withConcerns(ApplicationEventCreationConcern.class); + // END SNIPPET: concernAE + + module.entities(UserEntity.class); + + } + + + @Test + public void testApplicationEvent() throws UnitOfWorkCompletionException, IOException { + Users users = module.newTransient(Users.class); + + Principal administratorPrincipal = new Principal() { + public String getName() { + return "administrator"; + } + }; + + UnitOfWork uow1 = module.newUnitOfWork(UsecaseBuilder.newUsecase("User signup")); + uow1.setMetaInfo(administratorPrincipal); + users.signup(null, "user1", Arrays.asList("news1", "news2")); + uow1.complete(); + + UnitOfWork uow2 = module.newUnitOfWork(); + uow2.setMetaInfo(administratorPrincipal); + users.signup(null, "user2", Collections.EMPTY_LIST); + uow2.complete(); + + UnitOfWork uow3 = module.newUnitOfWork(); + uow3.setMetaInfo(administratorPrincipal); + users.signup(null, "user3", Collections.singletonList("news1")); + uow3.complete(); + + + // receive events from uow2 and later forwards + EventsInbox afterInbox = new EventsInbox(); + eventSource.transactionsAfter(uow2.currentTime() - 1, Integer.MAX_VALUE).transferTo(afterInbox); + + assertEquals(2, afterInbox.getEvents().size()); + + ApplicationEvent signupEvent2 = afterInbox.getEvents().get(0).events().get().get(0); + + assertEquals("signup", signupEvent2.name().get()); + assertEquals("user2", ApplicationEventParameters.getParameter(signupEvent2, "param1")); + assertEquals("[]", ApplicationEventParameters.getParameter(signupEvent2, "param2")); + + // receive events from uow2 backwards + EventsInbox beforeInbox = new EventsInbox(); + eventSource.transactionsBefore(uow3.currentTime(), Integer.MAX_VALUE).transferTo(beforeInbox); + + assertEquals(2, beforeInbox.getEvents().size()); + + ApplicationEvent signupEvent1 = beforeInbox.getEvents().get(1).events().get().get(0); + + assertEquals("signup", signupEvent1.name().get()); + assertEquals("user1", ApplicationEventParameters.getParameter(signupEvent1, "param1")); + assertEquals("[\"news1\",\"news2\"]", ApplicationEventParameters.getParameter(signupEvent1, "param2")); + + + } + + static class EventsInbox implements Output<TransactionApplicationEvents, RuntimeException> { + + private List<TransactionApplicationEvents> events = new LinkedList<>(); + + @Override + public <SenderThrowableType extends Throwable> void receiveFrom(Sender<? extends TransactionApplicationEvents, SenderThrowableType> sender) throws RuntimeException, SenderThrowableType { + try { + sender.sendTo(new Receiver<TransactionApplicationEvents, Throwable>() { + @Override + public void receive(TransactionApplicationEvents item) throws Throwable { + events.add(item); + } + }); + + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + + } + + public List<TransactionApplicationEvents> getEvents() { + return events; + } + } + + @Mixins(Users.Mixin.class) + public interface Users extends TransientComposite { + + // START SNIPPET: methodAE + void signup(@Optional ApplicationEvent evt, String username, List<String> mailinglists); + // END SNIPPET: methodAR + + abstract class Mixin implements Users { + + @Structure + UnitOfWorkFactory uowFactory; + + @Override + public void signup(ApplicationEvent evt, String username, List<String> mailinglists) { + if (evt == null) { + UnitOfWork uow = uowFactory.currentUnitOfWork(); + + EntityBuilder<UserEntity> builder = uow.newEntityBuilder(UserEntity.class); + builder.instance().username().set(username); + builder.newInstance(); + + } + } + } + } + + public interface UserEntity + extends EntityComposite { + + @UseDefaults + Property<String> username(); + + } + +} http://git-wip-us.apache.org/repos/asf/zest-qi4j/blob/ae8b4700/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/domain/DomainEventTest.java ---------------------------------------------------------------------- diff --git a/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/domain/DomainEventTest.java b/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/domain/DomainEventTest.java index 7a84999..9d6cdc1 100644 --- a/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/domain/DomainEventTest.java +++ b/libraries/eventsourcing/src/test/java/org/qi4j/library/eventsourcing/domain/DomainEventTest.java @@ -28,6 +28,7 @@ import org.qi4j.bootstrap.ModuleAssembly; import org.qi4j.functional.Function; import org.qi4j.io.Outputs; import org.qi4j.io.Transforms; +import org.qi4j.library.eventsourcing.bootstrap.EventsourcingAssembler; import org.qi4j.library.eventsourcing.domain.api.DomainEvent; import org.qi4j.library.eventsourcing.domain.api.DomainEventValue; import org.qi4j.library.eventsourcing.domain.api.UnitOfWorkDomainEventsValue; @@ -52,13 +53,20 @@ public class DomainEventTest { new EntityTestAssembler( ).assemble( module ); - module.values( DomainEventValue.class, UnitOfWorkDomainEventsValue.class ); + // START SNIPPET: assemblyDE + new EventsourcingAssembler() + .withDomainEvents() + .withCurrentUserFromUOWPrincipal() + .assemble(module); + // START SNIPPET: assemblyDE + + // START SNIPPET: storeDE module.services( MemoryEventStoreService.class ); - module.services( DomainEventFactoryService.class ); - module.importedServices( CurrentUserUoWPrincipal.class ).importedBy( ImportedServiceDeclaration.NEW_OBJECT ); - module.objects( CurrentUserUoWPrincipal.class ); + // START SNIPPET: storeDE + // START SNIPPET: concernDE module.entities( TestEntity.class ).withConcerns(DomainEventCreationConcern.class); + // END SNIPPET: concernDE } @Test @@ -93,6 +101,7 @@ public class DomainEventTest }, Outputs.systemOut() )); } + // START SNIPPET: methodDE @Mixins( TestEntity.Mixin.class ) public interface TestEntity extends EntityComposite @@ -112,4 +121,5 @@ public class DomainEventTest } } } + // END SNIPPET: methodDE }
