[
https://issues.apache.org/activemq/browse/CAMEL-125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40027
]
Martin Krasser commented on CAMEL-125:
--------------------------------------
... and thanks for applying the patches :)
> Support for a stream-processing resequencer.
> --------------------------------------------
>
> Key: CAMEL-125
> URL: https://issues.apache.org/activemq/browse/CAMEL-125
> Project: Apache Camel
> Issue Type: Improvement
> Components: camel-core, camel-spring
> Affects Versions: 1.1.0
> Environment: Java 6, Windows XP
> Reporter: Martin Krasser
> Fix For: 1.2.0
>
> Attachments: camel-core-patch.txt, camel-spring-patch.txt
>
>
> Attached is a patch that adds a stream-processing resequencer to Camel. The
> resequencing algorithm is based on the detection of gaps in a message stream
> rather than on a fixed batch size. Gap detection in combination with timeouts
> removes the constraint of having to know the number of messages of a sequence
> in advance (although a capacity parameter prevents the resequencer from
> running out of memory)
> Route builder examples for the stream-processing resequencer:
> {{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}
> is equivalent to:
> {{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}
> Custom values for the resequencer's capacity and timeout can be set like in
> this example:
> {{from("direct:start").resequencer(header("seqnum")).stream(new
> StreamResequencerConfig(300, 4000L)).to("mock:result")}}
> The XML configuration looks like:
> {code:xml}
> <camelContext id="camel"
> xmlns="http://activemq.apache.org/camel/schema/spring">
> <route>
> <from uri="direct:start"/>
> <resequencer>
> <simple>in.header.seqnum</simple>
> <to uri="mock:result" />
> <stream-config capacity="300", timeout="4000"/>
> </resequencer>
> </route>
> </camelContext>
> {code}
> The existing batch-processing resequencer can be defined as usual:
> {{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}
> which is now equivalent to
> {{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}
> It is now also possible to define a custom configuration for the existing
> batch-processing resequencer:
> {{from("direct:start").resequencer(header("seqnum")).batch(new
> BatchResequencerConfig(300, 4000L)).to("mock:result")}}
> This set the batchSize to 300 and the batchTimeout to 4000 ms.
> For the stream-processing resequencer to work, messages must contain a
> sequence number for which a predecessor and a successor is known. For example
> a message with the sequence number 3 has a predecessor message with the
> sequence number 2 and a successor message with the sequence number 4. The
> message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The
> resequencer therefore has to retain message 5 until message 4 arrives (or a
> timeout occurs).
> Gap detection is done with strategies that implement the
> SequenceNumberComparator<E> interface. In addition to the
> java.util.Comparator<E>.compare(E, E) operation the
> SequenceNumberComparator<E> interface defines the predecessor(E, E) and
> successor(E, E) operations. The stream resequencer can be configured with
> cutstom SequenceNumberComparator<E> strategies.
> The stream-processing resequencer uses the same algorithm as the one in
> ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time
> dependencies to ServiceMix I've copied the ServiceMix-independent
> resequencing engine over to Camel. This redundancy should be removed once
> Camel and servicemix-eip are going to be combined (are they?). I can
> contribute to this task, if needed.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.