Hi Azeez,
Please find the below sample code.
public class Test {
public static void main(String[] args) {
Thread t = new Thread(new Test.ContentFiller());
t.start();
}
static class ContentFiller implements Runnable {
@Override
public void run() {
CarbonMessage carbonMessage = new DefaultCarbonMessage();
String val = "This is a test";
byte[] array = val.getBytes();
Thread t = new Thread(new ContentReader(carbonMessage));
t.start();
for (int i = 0; i < 1000; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocate(array.length);
byteBuffer.put(array);
System.out.println("Adding content");
carbonMessage.addMessageBody(byteBuffer);
}
carbonMessage.setEndOfMsgAdded(true);
}
}
static class ContentReader implements Runnable {
CarbonMessage carbonMessage;
ContentReader(CarbonMessage carbonMessage) {
this.carbonMessage = carbonMessage;
}
@Override
public void run() {
while (true) {
if (carbonMessage.isEmpty() &&
carbonMessage.isEndOfMsgAdded()) {
break;
} else {
ByteBuffer byteBuffer = carbonMessage.getMessageBody();
System.out.println("Content Reading " + new
String(byteBuffer.array()));
}
}
}
}
}
On Thu, Jun 2, 2016 at 9:55 PM, Afkham Azeez <[email protected]> wrote:
> This CarbonMessage API is not intuitive. Please share code samples that
> demonstrate streaming input handling with CarbonMessage (request).
>
> On Thu, Jun 2, 2016 at 9:52 PM, Samiyuru Senarathne <[email protected]>
> wrote:
>
>> I assume isEndOfMsgAdded only tell that all the chunks are loaded to the
>> blocking queue. So still we have to complete the total blocking queue.
>>
>> On Thu, Jun 2, 2016 at 9:50 PM, Afkham Azeez <[email protected]> wrote:
>>
>>> while(!carbonMsg.isEndOfMsgAdded) {
>>> ByteBuffer chunk = carbonMsg.getMessageBody();
>>>
>>> }
>>>
>>> In the above segment, carbonMsg is the request CarbonMessage and the
>>> above code segment doesn't work. Code inside the while loop never executes.
>>>
>>>
>>> On Thu, Jun 2, 2016 at 9:47 PM, Isuru Ranawaka <[email protected]> wrote:
>>>
>>>> Hi Azeez,
>>>> yes that should work. Anyhow content needs to be added to carbonMsg
>>>> from one thread and read to be from another thread.
>>>>
>>>> thanks
>>>>
>>>> On Thu, Jun 2, 2016 at 9:23 PM, Afkham Azeez <[email protected]> wrote:
>>>>
>>>>> Is the following code segment the way to read all the chunks;
>>>>>
>>>>> while(!carbonMsg.isEndOfMsgAdded) {
>>>>> ByteBuffer chunk = carbonMsg.getMessageBody();
>>>>>
>>>>> }
>>>>>
>>>>> On Thu, Jun 2, 2016 at 9:21 PM, Afkham Azeez <[email protected]> wrote:
>>>>>
>>>>>> Looks like we require changes for input streaming as well.
>>>>>>
>>>>>> Does CarbonMessage.getMessageBody() block until the next chunk is
>>>>>> received?
>>>>>>
>>>>>> On Thu, Jun 2, 2016 at 9:18 PM, Afkham Azeez <[email protected]> wrote:
>>>>>>
>>>>>>> Samiyuru/Isuru,
>>>>>>> Does this mean that the input streaming in MSF4J is currently
>>>>>>> working without any issue and only output streaming requires some work?
>>>>>>>
>>>>>>> On Tue, May 3, 2016 at 2:29 PM, Isuru Ranawaka <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Following are the details on streaming support of carbon transport.
>>>>>>>> Basically we can looking to streaming support in request path and
>>>>>>>> response
>>>>>>>> path separately.
>>>>>>>>
>>>>>>>> Request Path (Streaming is working)
>>>>>>>>
>>>>>>>> [image: requestpath.png]
>>>>>>>>
>>>>>>>> According to above diagram
>>>>>>>>
>>>>>>>> -
>>>>>>>>
>>>>>>>> We have blocking queue in the carbon message which keeps the
>>>>>>>> content . when headers are received through Netty worker thread it
>>>>>>>> will
>>>>>>>> create CarbonMessage with a blocking queue and publish carbon
>>>>>>>> message to
>>>>>>>> engine level thread.
>>>>>>>> -
>>>>>>>>
>>>>>>>> Reference for blocking queue is cached in the connection and
>>>>>>>> when content is received it will be filled to that queue from Netty
>>>>>>>> worker
>>>>>>>> thread.
>>>>>>>> -
>>>>>>>>
>>>>>>>> Meanwhile engine level threads can consume content through
>>>>>>>> queue(While IO worker is filling ) and can directly send to file
>>>>>>>> system or
>>>>>>>> use sender for send messages to external service.
>>>>>>>> -
>>>>>>>>
>>>>>>>> According to that streaming should work in Request path in
>>>>>>>> Integration Server or MSF4J without any problem.
>>>>>>>>
>>>>>>>>
>>>>>>>> Response Path (Streaming working scenario)
>>>>>>>>
>>>>>>>> [image: responsepathworking.png]
>>>>>>>>
>>>>>>>>
>>>>>>>> In Response path basic difference we have is we are handling
>>>>>>>> responses through callbacks.
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>>
>>>>>>>> Similar to Request path architecture Sender side IO thread
>>>>>>>> creates a CM when response headers are received and publish to
>>>>>>>> engine
>>>>>>>> level thread .
>>>>>>>> -
>>>>>>>>
>>>>>>>> Engine level thread calls carbonCallback.done() and waits on
>>>>>>>> queue for content.
>>>>>>>> -
>>>>>>>>
>>>>>>>> Meanwhile IO thread writes the content to the Queue .
>>>>>>>> -
>>>>>>>>
>>>>>>>> So writing to Queue and reading from queue happens parallel and
>>>>>>>> streaming should work properly.
>>>>>>>> -
>>>>>>>>
>>>>>>>> So streaming is working fine with Integration Server for this
>>>>>>>> kind of scenarios.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Response Path (Streaming not working scenario)
>>>>>>>>
>>>>>>>> [image: responsepathnotworking.png]
>>>>>>>>
>>>>>>>> This scenario is basically, if we have a Echo mediator or assume
>>>>>>>> MSF4J has a service which is reading a file as chunks and writes back
>>>>>>>> to
>>>>>>>> client .
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>>
>>>>>>>> Basic difference with the previous one is in this approach
>>>>>>>> reading a file chunk or stream and writing that file chunk or
>>>>>>>> stream to
>>>>>>>> Queue is happened within the same engine level thread as well as
>>>>>>>> with
>>>>>>>> reading from queue and writing to Listener side Netty worker
>>>>>>>> threads.(Same
>>>>>>>> thread produce and consume causes for deadlock)
>>>>>>>> -
>>>>>>>>
>>>>>>>> In the previous example reading from stream and filling the
>>>>>>>> queue was happened through Sender side IO thread and consuming the
>>>>>>>> queue
>>>>>>>> and writing to Listener side IO thread was happened through engine
>>>>>>>> level
>>>>>>>> thread. (Two different threads produce and consume)
>>>>>>>> -
>>>>>>>>
>>>>>>>> Streaming is not working in this kind of scenarios because we
>>>>>>>> cannot write to queue and keep polling the queue within single
>>>>>>>> thread.
>>>>>>>>
>>>>>>>>
>>>>>>>> We can figure out following solutions and what will be the most
>>>>>>>> suitable one.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>>
>>>>>>>> Introduce another thread for run the callback logic instead of
>>>>>>>> running through calling thread.
>>>>>>>>
>>>>>>>> This will solve the above streaming problem but
>>>>>>>> when it comes to general message flow this will add another level of
>>>>>>>> thread which will actually does not need.
>>>>>>>>
>>>>>>>> -
>>>>>>>>
>>>>>>>> Introduce another method in ResponseCallback which will support
>>>>>>>> streaming which does not queuing contents. it will directly call IO
>>>>>>>> threads and write contents to IO when chunks are received. This can
>>>>>>>> be used
>>>>>>>> only within single thread scenario(File reading and writing ).
>>>>>>>> -
>>>>>>>>
>>>>>>>> Introduce another thread for read File stream or call Callback
>>>>>>>> from another thread other than to reading thread from MSF4J
>>>>>>>> level.Then it
>>>>>>>> will be equivalent to how we used it in Integration Server with the
>>>>>>>> use of
>>>>>>>> Sender.
>>>>>>>>
>>>>>>>> ThankYou
>>>>>>>>
>>>>>>>> IsuruR
>>>>>>>>
>>>>>>>> On Tue, May 3, 2016 at 12:09 PM, Kasun Indrasiri <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Had a chat with Ranawaka on this.. seems like we do have couple of
>>>>>>>>> ways to handle this. He will share the details.
>>>>>>>>>
>>>>>>>>> On Mon, May 2, 2016 at 6:13 AM, Afkham Azeez <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The problem is, we can't do the next MSF4J release because we
>>>>>>>>>> can't lose a feature in a release. This is a blocker for us. We have
>>>>>>>>>> plans
>>>>>>>>>> to release in the next 2 weeks.
>>>>>>>>>>
>>>>>>>>>> On Mon, May 2, 2016 at 4:56 PM, Isuru Ranawaka <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Azeez,
>>>>>>>>>>>
>>>>>>>>>>> Currently streaming works if we used both sender side and
>>>>>>>>>>> listener side only . But since MSF4J is using only listener side if
>>>>>>>>>>> we did
>>>>>>>>>>> not spawn separate thread from engine level for writing response
>>>>>>>>>>> it will
>>>>>>>>>>> not work because request reading and writing happens through same
>>>>>>>>>>> thread.
>>>>>>>>>>> But with the next release we will fix that. currently we are
>>>>>>>>>>> finalizing on
>>>>>>>>>>> removing engine level thread model from transport.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> thanks
>>>>>>>>>>> IsuruR
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 2, 2016 at 3:50 PM, Afkham Azeez <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Is this working after moving to the new transport framework?
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> *Afkham Azeez*
>>>>>>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>>>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>>>>>>>> * <http://www.apache.org/>*
>>>>>>>>>>>> *email: **[email protected]* <[email protected]>
>>>>>>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>>>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>>>>>>>> <http://twitter.com/afkham_azeez>
>>>>>>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>>>>>>>
>>>>>>>>>>>> *Lean . Enterprise . Middleware*
>>>>>>>>>>>>
>>>>>>>>>>>> _______________________________________________
>>>>>>>>>>>> Dev mailing list
>>>>>>>>>>>> [email protected]
>>>>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>> Isuru Ranawaka
>>>>>>>>>>> M: +94714629880
>>>>>>>>>>> Blog : http://isurur.blogspot.com/
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> *Afkham Azeez*
>>>>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>>>>>> * <http://www.apache.org/>*
>>>>>>>>>> *email: **[email protected]* <[email protected]>
>>>>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>>>>>> <http://twitter.com/afkham_azeez>
>>>>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>>>>>
>>>>>>>>>> *Lean . Enterprise . Middleware*
>>>>>>>>>>
>>>>>>>>>> _______________________________________________
>>>>>>>>>> Dev mailing list
>>>>>>>>>> [email protected]
>>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Kasun Indrasiri
>>>>>>>>> Software Architect
>>>>>>>>> WSO2, Inc.; http://wso2.com
>>>>>>>>> lean.enterprise.middleware
>>>>>>>>>
>>>>>>>>> cell: +94 77 556 5206
>>>>>>>>> Blog : http://kasunpanorama.blogspot.com/
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>> Isuru Ranawaka
>>>>>>>> M: +94714629880
>>>>>>>> Blog : http://isurur.blogspot.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Afkham Azeez*
>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>>> * <http://www.apache.org/>*
>>>>>>> *email: **[email protected]* <[email protected]>
>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>>> <http://twitter.com/afkham_azeez>
>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>>
>>>>>>> *Lean . Enterprise . Middleware*
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Afkham Azeez*
>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>> * <http://www.apache.org/>*
>>>>>> *email: **[email protected]* <[email protected]>
>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>> <http://twitter.com/afkham_azeez>
>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>
>>>>>> *Lean . Enterprise . Middleware*
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Afkham Azeez*
>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>> * <http://www.apache.org/>*
>>>>> *email: **[email protected]* <[email protected]>
>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>> <http://twitter.com/afkham_azeez>
>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>
>>>>> *Lean . Enterprise . Middleware*
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>> Isuru Ranawaka
>>>> M: +94714629880
>>>> Blog : http://isurur.blogspot.com/
>>>>
>>>
>>>
>>>
>>> --
>>> *Afkham Azeez*
>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>> Member; Apache Software Foundation; http://www.apache.org/
>>> * <http://www.apache.org/>*
>>> *email: **[email protected]* <[email protected]>
>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>> *twitter: **http://twitter.com/afkham_azeez*
>>> <http://twitter.com/afkham_azeez>
>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>
>>> *Lean . Enterprise . Middleware*
>>>
>>
>>
>>
>> --
>> Samiyuru Senarathne
>> *Software Engineer*
>> Mobile : +94 (0) 71 134 6087
>> [email protected]
>>
>
>
>
> --
> *Afkham Azeez*
> Director of Architecture; WSO2, Inc.; http://wso2.com
> Member; Apache Software Foundation; http://www.apache.org/
> * <http://www.apache.org/>*
> *email: **[email protected]* <[email protected]>
> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
> *http://blog.afkham.org* <http://blog.afkham.org>
> *twitter: **http://twitter.com/afkham_azeez*
> <http://twitter.com/afkham_azeez>
> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
> <http://lk.linkedin.com/in/afkhamazeez>*
>
> *Lean . Enterprise . Middleware*
>
--
Best Regards
Isuru Ranawaka
M: +94714629880
Blog : http://isurur.blogspot.com/
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev