You're right the compound operations cannot be guaranted to be thread-safe
but my idea was that we did not need it.
First get was used to prevent allocation on each write and as far as i know :
time(unsynced get on ConcurrentHashMap) <<< time(allocating possibly large
IoBuffer)
Last get should have been avoided using the code you pointed though i think we
must add
a newValue.free() in the else before returning.
IoBuffer oldValue = buffersMap.get(session);
if (oldValue == null) {
IoBuffer newValue = IoBuffer.allocate(bufferSize);
oldValue = buffersMap.putIfAbsent(session, newValue);
if (oldValue == null) { // means we put a new value
oldValue = newValue;
} else { // means we didn't put the new value but got the old value
newValue.free();
}
}
Yet, if large buffers and many writers threads are used, it will be costly in
terms of
memory used and time to allocate/deallocate unnecessary buffers.
Why not using the Memoizer pattern ?
/**
* Memoizer
* <p/>
* Final implementation of Memoizer
*
* @author Brian Goetz and Tim Peierls
*/
public class Memoizer <A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
Using this pattern will prevent allocating unnecessary IoBuffer's and only uses
two calls to the map in the worst case.
This may seem overkill but could be worth it
WDYT ?
Cordialement, Regards,
-Edouard De Oliveira-
http://tedorg.free.fr/en/main.php
----- Message d'origine ----
De : Sangjin Lee <[EMAIL PROTECTED]>
À : [email protected]
Envoyé le : Mercredi, 23 Juillet 2008, 2h25mn 14s
Objet : Re: svn commit: r678238 - in
/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
BufferedWriteFilter.java
I'm not sure if BufferedWriteFilter.write() would be thread safe. It does
three calls: get(), putIfAbsent(), and get(). Although these calls are
thread safe individually, the compound operations are not necessarily thread
safe.
As far as I know, the pattern that always works is as follows:
IoBuffer newValue = IoBuffer.allocate(bufferSize);
IoBuffer oldValue = buffersMap.putIfAbsent(session, newValue);
if (oldValue == null) { // means we put a new value
return newValue;
} else { // means we didn't put the new value but got the old value
return oldValue;
}
But the obvious drawback of this is that you always have to allocate the
buffer just to try putting it into the map. If we are not willing to incur
that penalty, then we'll still need to do get() calls which may bring back
the synchronization... Thoughts?
Thanks,
Sangjin
On Tue, Jul 22, 2008 at 3:19 PM, Edouard De Oliveira <[EMAIL PROTECTED]>
wrote:
> So we agree on ConcurrentHashMap use : all of the conditions you wrote
> should be met
> when correctly using this filter. The uneasy parameter is MAX_THREADS. I
> think there's
> no magical value so as an API service provider we should provide default
> implementation and let
> the user provide its own params for the ConcurrentHashMap instantiation or
> even its own
> instance.
>
> I think adding the following constructor should close the debate :
> public BufferedWriteFilter(int bufferSize, final ConcurrentMap<IoSession,
> IoBuffer> buffersMap);
>
>
> WDYT ?
>
> Cordialement, Regards,
> -Edouard De Oliveira-
> http://tedorg.free.fr/en/main.php
>
>
>
> ----- Message d'origine ----
> De : Bogdan Ciprian Pistol <[EMAIL PROTECTED]>
> À : [email protected]
> Envoyé le : Mardi, 22 Juillet 2008, 12h41mn 46s
> Objet : Re: Re : svn commit: r678238 - in
> /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> BufferedWriteFilter.java
>
> From my experience ConcurrentHashMap outperforms
> Collections.synchronizedMap(...) only if:
> - you know in advance how many maximum threads (MAX_THREADS) could
> simultaneously modify the ConcurrentHashMap
> - MAX_THREADS is small compared to the number of threads that are
> accessing the ConcurrentHashMap (reads)
> - the accesses (reads) vastly outnumbers the modifications (writes)
> ConcurrentHashMap also consumes more memory (increases proportional
> with MAX_THREADS)
>
> Bogdan
>
> On Tue, Jul 22, 2008 at 2:24 AM, Edouard De Oliveira
> <[EMAIL PROTECTED]> wrote:
> > Perfectly legitimate question Mark :
> >
> > Stop me if i'm not correct, but i think it would be an error using
> CopyOnWriteMap in this usecase
> > as the map grows when the number of sessions grows and as buffering takes
> full meaning in long
> > lived sessions, then each new session would duplicate a potentially
> larger map hence
> > hitting the bad side of CopyOnWriteMap.
> >
> >
> > So ConcurrentHashMap should be more appropriate.
> >
> > WDYT ?
> >
> > ps: i'm commiting right now the proposed fix
> > and a test class on trunk to ease any further review
> >
> > Cordialement, Regards,
> > -Edouard De Oliveira-
> > http://tedorg.free.fr/en/main.php
> >
> >
> >
> > ----- Message d'origine ----
> > De : Mark Webb <[EMAIL PROTECTED]>
> > À : [email protected]
> > Envoyé le : Lundi, 21 Juillet 2008, 2h47mn 54s
> > Objet : Re: svn commit: r678238 - in
> /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> BufferedWriteFilter.java
> >
> > I have a question on this class.
> >
> > Why not use ConcurrentHashMap for this instead of synchronizing? Or
> > CopyOnWriteMap?
> >
> >
> >
> >
> > On Sat, Jul 19, 2008 at 7:19 PM, <[EMAIL PROTECTED]> wrote:
> >
> >> Author: edeoliveira
> >> Date: Sat Jul 19 16:19:14 2008
> >> New Revision: 678238
> >>
> >> URL: http://svn.apache.org/viewvc?rev=678238&view=rev
> >> Log:
> >> New IoFilter that implements DIRMINA-519
> >>
> >> Added:
> >> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/
> >>
> >>
>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> >> (with props)
> >>
> >> Added:
> >>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> >> URL:
> >>
> http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678238&view=auto
> >>
> >>
> ==============================================================================
> >> ---
> >>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> >> (added)
> >> +++
> >>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> >> Sat Jul 19 16:19:14 2008
> >> @@ -0,0 +1,243 @@
> >> +/*
> >> + * Licensed to the Apache Software Foundation (ASF) under one
> >> + * or more contributor license agreements. See the NOTICE file
> >> + * distributed with this work for additional information
> >> + * regarding copyright ownership. The ASF licenses this file
> >> + * to you under the Apache License, Version 2.0 (the
> >> + * "License"); you may not use this file except in compliance
> >> + * with the License. You may obtain a copy of the License at
> >> + *
> >> + * http://www.apache.org/licenses/LICENSE-2.0
> >> + *
> >> + * Unless required by applicable law or agreed to in writing,
> >> + * software distributed under the License is distributed on an
> >> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> + * KIND, either express or implied. See the License for the
> >> + * specific language governing permissions and limitations
> >> + * under the License.
> >> + *
> >> + */
> >> +package org.apache.mina.filter.buffer;
> >> +
> >> +import java.io.BufferedOutputStream;
> >> +import java.util.HashMap;
> >> +import java.util.Map;
> >> +
> >> +import org.apache.mina.core.buffer.IoBuffer;
> >> +import org.apache.mina.core.filterchain.IoFilter;
> >> +import org.apache.mina.core.filterchain.IoFilterAdapter;
> >> +import org.apache.mina.core.session.IoSession;
> >> +import org.apache.mina.core.write.DefaultWriteRequest;
> >> +import org.apache.mina.core.write.WriteRequest;
> >> +import org.apache.mina.filter.codec.ProtocolCodecFilter;
> >> +
> >> +/**
> >> + * An [EMAIL PROTECTED] IoFilter} implementation used to buffer outgoing
> >> [EMAIL PROTECTED]
> >> WriteRequest} almost
> >> + * like what [EMAIL PROTECTED] BufferedOutputStream} does. Using this
> >> filter
> allows
> >> to be less dependent
> >> + * from network latency. It is also useful when a session is generating
> >> very small messages
> >> + * too frequently and consequently generating unnecessary traffic
> >> overhead.
> >> + *
> >> + * Please note that it should always be placed before the [EMAIL
> >> PROTECTED]
> >> ProtocolCodecFilter}
> >> + * as it only handles [EMAIL PROTECTED] WriteRequest}'s carrying [EMAIL
> >> PROTECTED] IoBuffer}
> >> objects.
> >> + *
> >> + * @author The Apache MINA Project ([email protected])
> >> + * @version $Rev: $, $Date: $
> >> + * @since MINA 2.0.0-M2
> >> + */
> >> +public final class BufferedWriteFilter extends IoFilterAdapter {
> >> +
> >> + /**
> >> + * Default buffer size value in bytes.
> >> + */
> >> + public final static int DEFAULT_BUFFER_SIZE = 8192;
> >> +
> >> + /**
> >> + * The buffer size allocated for each new session's buffer.
> >> + */
> >> + private int bufferSize = DEFAULT_BUFFER_SIZE;
> >> +
> >> + /**
> >> + * The map that matches an [EMAIL PROTECTED] IoSession} and it's
> >> [EMAIL PROTECTED]
> IoBuffer}
> >> + * buffer.
> >> + */
> >> + protected Map<IoSession, IoBuffer> buffersMap = new
> HashMap<IoSession,
> >> IoBuffer>();
> >> +
> >> + /**
> >> + * Default constructor. Sets buffer size to [EMAIL PROTECTED]
> >> #DEFAULT_BUFFER_SIZE}
> >> + * bytes.
> >> + */
> >> + public BufferedWriteFilter() {
> >> + this(DEFAULT_BUFFER_SIZE);
> >> + }
> >> +
> >> + /**
> >> + * Constructor which sets buffer size to <code>bufferSize</code>.
> >> + *
> >> + * @param bufferSize the new buffer size
> >> + */
> >> + public BufferedWriteFilter(int bufferSize) {
> >> + super();
> >> + this.bufferSize = bufferSize;
> >> + }
> >> +
> >> + /**
> >> + * Returns buffer size.
> >> + */
> >> + public int getBufferSize() {
> >> + return bufferSize;
> >> + }
> >> +
> >> + /**
> >> + * Sets the buffer size but only for the newly created buffers.
> >> + *
> >> + * @param bufferSize the new buffer size
> >> + */
> >> + public void setBufferSize(int bufferSize) {
> >> + this.bufferSize = bufferSize;
> >> + }
> >> +
> >> + /**
> >> + * [EMAIL PROTECTED]
> >> + *
> >> + * @throws Exception if <code>writeRequest.message</code> isn't an
> >> + * [EMAIL PROTECTED] IoBuffer} instance.
> >> + */
> >> + @Override
> >> + public void filterWrite(NextFilter nextFilter, IoSession session,
> >> + WriteRequest writeRequest) throws Exception {
> >> +
> >> + Object data = writeRequest.getMessage();
> >> +
> >> + if (data instanceof IoBuffer) {
> >> + write(session, (IoBuffer) data);
> >> + } else {
> >> + throw new IllegalArgumentException(
> >> + "This filter should only buffer IoBuffer objects");
> >> + }
> >> + }
> >> +
> >> + /**
> >> + * Writes an [EMAIL PROTECTED] IoBuffer} to the session's buffer.
> >> + *
> >> + * @param session the session to which a write is requested
> >> + * @param data the data to buffer
> >> + */
> >> + private void write(IoSession session, IoBuffer data) {
> >> + IoBuffer dest = null;
> >> + synchronized (buffersMap) {
> >> + dest = buffersMap.get(session);
> >> + if (dest == null) {
> >> + // Enforce the creation of a non-expandable buffer
> >> + dest =
> IoBuffer.allocate(bufferSize).setAutoExpand(false);
> >> + buffersMap.put(session, dest);
> >> + }
> >> + }
> >> +
> >> + write(session, data, dest);
> >> + }
> >> +
> >> + /**
> >> + * Writes <code>data</code> [EMAIL PROTECTED] IoBuffer} to the
> <code>buf</code>
> >> + * [EMAIL PROTECTED] IoBuffer} which buffers write requests for the
> >> + * <code>session</code> {@ link IoSession} until buffer is full
> >> + * or manually flushed.
> >> + *
> >> + * @param session the session where buffer will be written
> >> + * @param data the data to buffer
> >> + * @param buf the buffer where data will be temporarily written
> >> + */
> >> + private void write(IoSession session, IoBuffer data, IoBuffer buf)
> {
> >> + synchronized (buf) {
> >> + try {
> >> + int len = data.remaining();
> >> + if (len >= buf.capacity()) {
> >> + /*
> >> + * If the request length exceeds the size of the
> >> output buffer,
> >> + * flush the output buffer and then write the data
> >> directly.
> >> + */
> >> + NextFilter nextFilter = session.getFilterChain()
> >> + .getNextFilter(this);
> >> + internalFlush(nextFilter, session, buf);
> >> + nextFilter.filterWrite(session,
> >> + new DefaultWriteRequest(buf));
> >> + return;
> >> + }
> >> + if (len > (buf.limit() - buf.position())) {
> >> +
> >> internalFlush(session.getFilterChain().getNextFilter(this),
> >> + session, buf);
> >> + }
> >> + buf.put(data);
> >> + } catch (Throwable e) {
> >> + session.getFilterChain().fireExceptionCaught(e);
> >> + }
> >> + }
> >> + }
> >> +
> >> + /**
> >> + * Internal method that actually flushes the buffered data.
> >> + *
> >> + * @param nextFilter the [EMAIL PROTECTED] NextFilter} of this filter
> >> + * @param session the session where buffer will be written
> >> + * @param data the data to write
> >> + * @throws Exception if a write operation fails
> >> + */
> >> + private void internalFlush(NextFilter nextFilter, IoSession
> session,
> >> + IoBuffer data) throws Exception {
> >> + if (data != null) {
> >> + nextFilter.filterWrite(session, new
> >> DefaultWriteRequest(data));
> >> + data.clear();
> >> + }
> >> + }
> >> +
> >> + /**
> >> + * Flushes the buffered data.
> >> + *
> >> + * @param session the session where buffer will be written
> >> + */
> >> + public void flush(IoSession session) {
> >> + try {
> >> + IoBuffer data = null;
> >> + synchronized (session) {
> >> + data = buffersMap.get(session);
> >> + }
> >> + internalFlush(session.getFilterChain().getNextFilter(this),
> >> + session, data);
> >> +
> >> + } catch (Throwable e) {
> >> + session.getFilterChain().fireExceptionCaught(e);
> >> + }
> >> + }
> >> +
> >> + /**
> >> + * Internal method that actually frees the [EMAIL PROTECTED]
> >> IoBuffer} that
> >> contains
> >> + * the buffered data that has not been flushed.
> >> + *
> >> + * @param session the session we operate on
> >> + */
> >> + private void clear(IoSession session) {
> >> + synchronized (session) {
> >> + IoBuffer buf = buffersMap.remove(session);
> >> + if (buf != null) {
> >> + buf.free();
> >> + }
> >> + }
> >> + }
> >> +
> >> + /**
> >> + * [EMAIL PROTECTED]
> >> + */
> >> + @Override
> >> + public void exceptionCaught(NextFilter nextFilter, IoSession
> session,
> >> + Throwable cause) throws Exception {
> >> + clear(session);
> >> + }
> >> +
> >> + /**
> >> + * [EMAIL PROTECTED]
> >> + */
> >> + @Override
> >> + public void sessionClosed(NextFilter nextFilter, IoSession session)
> >> + throws Exception {
> >> + clear(session);
> >> + }
> >> +}
> >> \ No newline at end of file
> >>
> >> Propchange:
> >>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> >>
> >>
> ------------------------------------------------------------------------------
> >> svn:eol-style = native
> >>
> >> Propchange:
> >>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> >>
> >>
> ------------------------------------------------------------------------------
> >> svn:mime-type = text/plain
> >>
> >>
> >>
> >
> >
> >
> >
> _____________________________________________________________________________
> > Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> http://mail.yahoo.fr
> >
>
>
>
>
> _____________________________________________________________________________
> Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> http://mail.yahoo.fr
>
_____________________________________________________________________________
Envoyez avec Yahoo! Mail. Une boite mail plus intelligente http://mail.yahoo.fr