I like the idea of using a memoizer pattern.  Perhaps one can add the
memoizer pattern as a utility class, and have this class
(BufferedWriterFilter) simply use it.  I can imagine more cases where such a
pattern can be used.  My 2 cents.

Sangjin

On Wed, Jul 23, 2008 at 1:55 AM, Edouard De Oliveira <[EMAIL PROTECTED]>
wrote:

> You're right the compound operations cannot be guaranted to be thread-safe
> but my idea was that we did not need it.
>
> First get was used to prevent allocation on each write and as far as i know
> :
> time(unsynced get on ConcurrentHashMap) <<< time(allocating possibly large
> IoBuffer)
>
> Last get should have been avoided using the code you pointed though i think
> we must add
> a newValue.free() in the else before returning.
>
>
> IoBuffer oldValue = buffersMap.get(session);
> if (oldValue == null) {
>     IoBuffer newValue = IoBuffer.allocate(bufferSize);
>     oldValue = buffersMap.putIfAbsent(session, newValue);
>    if (oldValue == null) {  // means we put a new value
>         oldValue = newValue;
>     } else {  // means we didn't put the new value but got the old value
>         newValue.free();
>    }
> }
>
> Yet, if large buffers and many writers threads are used, it will be costly
> in terms of
> memory used and time to allocate/deallocate unnecessary buffers.
>
> Why not using the Memoizer pattern ?
>
> /**
>  * Memoizer
>  * <p/>
>  * Final implementation of Memoizer
>  *
>  * @author Brian Goetz and Tim Peierls
>  */
> public class Memoizer <A, V> implements Computable<A, V> {
>    private final ConcurrentMap<A, Future<V>> cache
>            = new ConcurrentHashMap<A, Future<V>>();
>    private final Computable<A, V> c;
>
>    public Memoizer(Computable<A, V> c) {
>        this.c = c;
>    }
>
>    public V compute(final A arg) throws InterruptedException {
>        while (true) {
>            Future<V> f = cache.get(arg);
>            if (f == null) {
>                Callable<V> eval = new Callable<V>() {
>                    public V call() throws InterruptedException {
>                        return c.compute(arg);
>                    }
>                };
>                FutureTask<V> ft = new FutureTask<V>(eval);
>                f = cache.putIfAbsent(arg, ft);
>                if (f == null) {
>                    f = ft;
>                    ft.run();
>                }
>            }
>            try {
>                return f.get();
>            } catch (CancellationException e) {
>                cache.remove(arg, f);
>            } catch (ExecutionException e) {
>                throw LaunderThrowable.launderThrowable(e.getCause());
>            }
>        }
>    }
> }
>
> Using this pattern will prevent allocating unnecessary IoBuffer's and only
> uses two calls to the map in the worst case.
> This may seem overkill but could be worth it
>
> WDYT ?
>
> Cordialement, Regards,
> -Edouard De Oliveira-
> http://tedorg.free.fr/en/main.php
>
>
>
> ----- Message d'origine ----
> De : Sangjin Lee <[EMAIL PROTECTED]>
> À : [email protected]
> Envoyé le : Mercredi, 23 Juillet 2008, 2h25mn 14s
> Objet : Re: svn commit: r678238 - in
> /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> BufferedWriteFilter.java
>
> I'm not sure if BufferedWriteFilter.write() would be thread safe.  It does
> three calls: get(), putIfAbsent(), and get().  Although these calls are
> thread safe individually, the compound operations are not necessarily
> thread
> safe.
>
> As far as I know, the pattern that always works is as follows:
>
> IoBuffer newValue = IoBuffer.allocate(bufferSize);
> IoBuffer oldValue = buffersMap.putIfAbsent(session, newValue);
> if (oldValue == null) {  // means we put a new value
>    return newValue;
> } else {  // means we didn't put the new value but got the old value
>    return oldValue;
> }
>
> But the obvious drawback of this is that you always have to allocate the
> buffer just to try putting it into the map.  If we are not willing to incur
> that penalty, then we'll still need to do get() calls which may bring back
> the synchronization...  Thoughts?
>
> Thanks,
> Sangjin
>
> On Tue, Jul 22, 2008 at 3:19 PM, Edouard De Oliveira <[EMAIL PROTECTED]>
> wrote:
>
> > So we agree on ConcurrentHashMap use : all of the conditions you wrote
> > should be met
> > when correctly using this filter. The uneasy parameter is MAX_THREADS. I
> > think there's
> > no magical value so as an API service provider we should provide default
> > implementation and let
> > the user provide its own params for the ConcurrentHashMap instantiation
> or
> > even its own
> > instance.
> >
> > I think adding the following constructor should close the debate :
> > public BufferedWriteFilter(int bufferSize, final ConcurrentMap<IoSession,
> > IoBuffer> buffersMap);
> >
> >
> > WDYT ?
> >
> > Cordialement, Regards,
> > -Edouard De Oliveira-
> > http://tedorg.free.fr/en/main.php
> >
> >
> >
> > ----- Message d'origine ----
> > De : Bogdan Ciprian Pistol <[EMAIL PROTECTED]>
> > À : [email protected]
> > Envoyé le : Mardi, 22 Juillet 2008, 12h41mn 46s
> > Objet : Re: Re : svn commit: r678238 - in
> > /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> > BufferedWriteFilter.java
> >
> > From my experience ConcurrentHashMap outperforms
> > Collections.synchronizedMap(...) only if:
> > - you know in advance how many maximum threads (MAX_THREADS) could
> > simultaneously modify the ConcurrentHashMap
> > - MAX_THREADS is small compared to the number of threads that are
> > accessing the ConcurrentHashMap (reads)
> > - the accesses (reads) vastly outnumbers the modifications (writes)
> > ConcurrentHashMap also consumes more memory (increases proportional
> > with MAX_THREADS)
> >
> > Bogdan
> >
> > On Tue, Jul 22, 2008 at 2:24 AM, Edouard De Oliveira
> > <[EMAIL PROTECTED]> wrote:
> > > Perfectly legitimate question Mark :
> > >
> > > Stop me if i'm not correct, but i think it would be an error using
> > CopyOnWriteMap in this usecase
> > > as the map grows when the number of sessions grows and as buffering
> takes
> > full meaning in long
> > > lived sessions, then each new session would duplicate a potentially
> > larger map hence
> > > hitting the bad side of CopyOnWriteMap.
> > >
> > >
> > > So ConcurrentHashMap should be more appropriate.
> > >
> > > WDYT ?
> > >
> > > ps: i'm commiting right now the proposed fix
> > > and a test class on trunk to ease any further review
> > >
> > > Cordialement, Regards,
> > > -Edouard De Oliveira-
> > > http://tedorg.free.fr/en/main.php
> > >
> > >
> > >
> > > ----- Message d'origine ----
> > > De : Mark Webb <[EMAIL PROTECTED]>
> > > À : [email protected]
> > > Envoyé le : Lundi, 21 Juillet 2008, 2h47mn 54s
> > > Objet : Re: svn commit: r678238 - in
> > /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> > BufferedWriteFilter.java
> > >
> > > I have a question on this class.
> > >
> > > Why not use ConcurrentHashMap for this instead of synchronizing?  Or
> > > CopyOnWriteMap?
> > >
> > >
> > >
> > >
> > > On Sat, Jul 19, 2008 at 7:19 PM, <[EMAIL PROTECTED]> wrote:
> > >
> > >> Author: edeoliveira
> > >> Date: Sat Jul 19 16:19:14 2008
> > >> New Revision: 678238
> > >>
> > >> URL: http://svn.apache.org/viewvc?rev=678238&view=rev
> > >> Log:
> > >> New IoFilter that implements DIRMINA-519
> > >>
> > >> Added:
> > >>    mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/
> > >>
> > >>
> >
>  
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >>   (with props)
> > >>
> > >> Added:
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >> URL:
> > >>
> >
> http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678238&view=auto
> > >>
> > >>
> >
> ==============================================================================
> > >> ---
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >> (added)
> > >> +++
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >> Sat Jul 19 16:19:14 2008
> > >> @@ -0,0 +1,243 @@
> > >> +/*
> > >> + *  Licensed to the Apache Software Foundation (ASF) under one
> > >> + *  or more contributor license agreements.  See the NOTICE file
> > >> + *  distributed with this work for additional information
> > >> + *  regarding copyright ownership.  The ASF licenses this file
> > >> + *  to you under the Apache License, Version 2.0 (the
> > >> + *  "License"); you may not use this file except in compliance
> > >> + *  with the License.  You may obtain a copy of the License at
> > >> + *
> > >> + *    http://www.apache.org/licenses/LICENSE-2.0
> > >> + *
> > >> + *  Unless required by applicable law or agreed to in writing,
> > >> + *  software distributed under the License is distributed on an
> > >> + *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > >> + *  KIND, either express or implied.  See the License for the
> > >> + *  specific language governing permissions and limitations
> > >> + *  under the License.
> > >> + *
> > >> + */
> > >> +package org.apache.mina.filter.buffer;
> > >> +
> > >> +import java.io.BufferedOutputStream;
> > >> +import java.util.HashMap;
> > >> +import java.util.Map;
> > >> +
> > >> +import org.apache.mina.core.buffer.IoBuffer;
> > >> +import org.apache.mina.core.filterchain.IoFilter;
> > >> +import org.apache.mina.core.filterchain.IoFilterAdapter;
> > >> +import org.apache.mina.core.session.IoSession;
> > >> +import org.apache.mina.core.write.DefaultWriteRequest;
> > >> +import org.apache.mina.core.write.WriteRequest;
> > >> +import org.apache.mina.filter.codec.ProtocolCodecFilter;
> > >> +
> > >> +/**
> > >> + * An [EMAIL PROTECTED] IoFilter} implementation used to buffer 
> > >> outgoing [EMAIL PROTECTED]
> > >> WriteRequest} almost
> > >> + * like what [EMAIL PROTECTED] BufferedOutputStream} does. Using this 
> > >> filter
> > allows
> > >> to be less dependent
> > >> + * from network latency. It is also useful when a session is
> generating
> > >> very small messages
> > >> + * too frequently and consequently generating unnecessary traffic
> > >> overhead.
> > >> + *
> > >> + * Please note that it should always be placed before the [EMAIL 
> > >> PROTECTED]
> > >> ProtocolCodecFilter}
> > >> + * as it only handles [EMAIL PROTECTED] WriteRequest}'s carrying [EMAIL 
> > >> PROTECTED]
> IoBuffer}
> > >> objects.
> > >> + *
> > >> + * @author The Apache MINA Project ([email protected])
> > >> + * @version $Rev: $, $Date: $
> > >> + * @since MINA 2.0.0-M2
> > >> + */
> > >> +public final class BufferedWriteFilter extends IoFilterAdapter {
> > >> +
> > >> +    /**
> > >> +     * Default buffer size value in bytes.
> > >> +     */
> > >> +    public final static int DEFAULT_BUFFER_SIZE = 8192;
> > >> +
> > >> +    /**
> > >> +     * The buffer size allocated for each new session's buffer.
> > >> +     */
> > >> +    private int bufferSize = DEFAULT_BUFFER_SIZE;
> > >> +
> > >> +    /**
> > >> +     * The map that matches an [EMAIL PROTECTED] IoSession} and it's 
> > >> [EMAIL PROTECTED]
> > IoBuffer}
> > >> +     * buffer.
> > >> +     */
> > >> +    protected Map<IoSession, IoBuffer> buffersMap = new
> > HashMap<IoSession,
> > >> IoBuffer>();
> > >> +
> > >> +    /**
> > >> +     * Default constructor. Sets buffer size to [EMAIL PROTECTED]
> > >> #DEFAULT_BUFFER_SIZE}
> > >> +     * bytes.
> > >> +     */
> > >> +    public BufferedWriteFilter() {
> > >> +        this(DEFAULT_BUFFER_SIZE);
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Constructor which sets buffer size to <code>bufferSize</code>.
> > >> +     *
> > >> +     * @param bufferSize the new buffer size
> > >> +     */
> > >> +    public BufferedWriteFilter(int bufferSize) {
> > >> +        super();
> > >> +        this.bufferSize = bufferSize;
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Returns buffer size.
> > >> +     */
> > >> +    public int getBufferSize() {
> > >> +        return bufferSize;
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Sets the buffer size but only for the newly created buffers.
> > >> +     *
> > >> +     * @param bufferSize the new buffer size
> > >> +     */
> > >> +    public void setBufferSize(int bufferSize) {
> > >> +        this.bufferSize = bufferSize;
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * [EMAIL PROTECTED]
> > >> +     *
> > >> +     * @throws Exception if <code>writeRequest.message</code> isn't
> an
> > >> +     *                   [EMAIL PROTECTED] IoBuffer} instance.
> > >> +     */
> > >> +    @Override
> > >> +    public void filterWrite(NextFilter nextFilter, IoSession session,
> > >> +            WriteRequest writeRequest) throws Exception {
> > >> +
> > >> +        Object data = writeRequest.getMessage();
> > >> +
> > >> +        if (data instanceof IoBuffer) {
> > >> +            write(session, (IoBuffer) data);
> > >> +        } else {
> > >> +            throw new IllegalArgumentException(
> > >> +                    "This filter should only buffer IoBuffer
> objects");
> > >> +        }
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Writes an [EMAIL PROTECTED] IoBuffer} to the session's buffer.
> > >> +     *
> > >> +     * @param session the session to which a write is requested
> > >> +     * @param data the data to buffer
> > >> +     */
> > >> +    private void write(IoSession session, IoBuffer data) {
> > >> +        IoBuffer dest = null;
> > >> +        synchronized (buffersMap) {
> > >> +            dest = buffersMap.get(session);
> > >> +            if (dest == null) {
> > >> +                // Enforce the creation of a non-expandable buffer
> > >> +                dest =
> > IoBuffer.allocate(bufferSize).setAutoExpand(false);
> > >> +                buffersMap.put(session, dest);
> > >> +            }
> > >> +        }
> > >> +
> > >> +        write(session, data, dest);
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Writes <code>data</code> [EMAIL PROTECTED] IoBuffer} to the
> > <code>buf</code>
> > >> +     * [EMAIL PROTECTED] IoBuffer} which buffers write requests for the
> > >> +     * <code>session</code> {@ link IoSession} until buffer is full
> > >> +     * or manually flushed.
> > >> +     *
> > >> +     * @param session the session where buffer will be written
> > >> +     * @param data the data to buffer
> > >> +     * @param buf the buffer where data will be temporarily written
> > >> +     */
> > >> +    private void write(IoSession session, IoBuffer data, IoBuffer
> buf)
> > {
> > >> +        synchronized (buf) {
> > >> +            try {
> > >> +                int len = data.remaining();
> > >> +                if (len >= buf.capacity()) {
> > >> +                    /*
> > >> +                     * If the request length exceeds the size of the
> > >> output buffer,
> > >> +                     * flush the output buffer and then write the
> data
> > >> directly.
> > >> +                     */
> > >> +                    NextFilter nextFilter = session.getFilterChain()
> > >> +                            .getNextFilter(this);
> > >> +                    internalFlush(nextFilter, session, buf);
> > >> +                    nextFilter.filterWrite(session,
> > >> +                            new DefaultWriteRequest(buf));
> > >> +                    return;
> > >> +                }
> > >> +                if (len > (buf.limit() - buf.position())) {
> > >> +
> > >>  internalFlush(session.getFilterChain().getNextFilter(this),
> > >> +                            session, buf);
> > >> +                }
> > >> +                buf.put(data);
> > >> +            } catch (Throwable e) {
> > >> +                session.getFilterChain().fireExceptionCaught(e);
> > >> +            }
> > >> +        }
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Internal method that actually flushes the buffered data.
> > >> +     *
> > >> +     * @param nextFilter the [EMAIL PROTECTED] NextFilter} of this 
> > >> filter
> > >> +     * @param session the session where buffer will be written
> > >> +     * @param data the data to write
> > >> +     * @throws Exception if a write operation fails
> > >> +     */
> > >> +    private void internalFlush(NextFilter nextFilter, IoSession
> > session,
> > >> +            IoBuffer data) throws Exception {
> > >> +        if (data != null) {
> > >> +            nextFilter.filterWrite(session, new
> > >> DefaultWriteRequest(data));
> > >> +            data.clear();
> > >> +        }
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Flushes the buffered data.
> > >> +     *
> > >> +     * @param session the session where buffer will be written
> > >> +     */
> > >> +    public void flush(IoSession session) {
> > >> +        try {
> > >> +            IoBuffer data = null;
> > >> +            synchronized (session) {
> > >> +                data = buffersMap.get(session);
> > >> +            }
> > >> +
>  internalFlush(session.getFilterChain().getNextFilter(this),
> > >> +                    session, data);
> > >> +
> > >> +        } catch (Throwable e) {
> > >> +            session.getFilterChain().fireExceptionCaught(e);
> > >> +        }
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * Internal method that actually frees the [EMAIL PROTECTED] 
> > >> IoBuffer} that
> > >> contains
> > >> +     * the buffered data that has not been flushed.
> > >> +     *
> > >> +     * @param session the session we operate on
> > >> +     */
> > >> +    private void clear(IoSession session) {
> > >> +        synchronized (session) {
> > >> +            IoBuffer buf = buffersMap.remove(session);
> > >> +            if (buf != null) {
> > >> +                buf.free();
> > >> +            }
> > >> +        }
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * [EMAIL PROTECTED]
> > >> +     */
> > >> +    @Override
> > >> +    public void exceptionCaught(NextFilter nextFilter, IoSession
> > session,
> > >> +            Throwable cause) throws Exception {
> > >> +        clear(session);
> > >> +    }
> > >> +
> > >> +    /**
> > >> +     * [EMAIL PROTECTED]
> > >> +     */
> > >> +    @Override
> > >> +    public void sessionClosed(NextFilter nextFilter, IoSession
> session)
> > >> +            throws Exception {
> > >> +        clear(session);
> > >> +    }
> > >> +}
> > >> \ No newline at end of file
> > >>
> > >> Propchange:
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >>
> > >>
> >
> ------------------------------------------------------------------------------
> > >>    svn:eol-style = native
> > >>
> > >> Propchange:
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >>
> > >>
> >
> ------------------------------------------------------------------------------
> > >>    svn:mime-type = text/plain
> > >>
> > >>
> > >>
> > >
> > >
> > >
> > >
> >
>  _____________________________________________________________________________
> > > Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> > http://mail.yahoo.fr
> > >
> >
> >
> >
> >
> >
>  _____________________________________________________________________________
> > Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> > http://mail.yahoo.fr
> >
>
>
>
>
>  _____________________________________________________________________________
> Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> http://mail.yahoo.fr
>

Reply via email to