I like the idea of using a memoizer pattern. Perhaps one can add the
memoizer pattern as a utility class, and have this class
(BufferedWriterFilter) simply use it. I can imagine more cases where such a
pattern can be used. My 2 cents.
Sangjin
On Wed, Jul 23, 2008 at 1:55 AM, Edouard De Oliveira <[EMAIL PROTECTED]>
wrote:
> You're right the compound operations cannot be guaranted to be thread-safe
> but my idea was that we did not need it.
>
> First get was used to prevent allocation on each write and as far as i know
> :
> time(unsynced get on ConcurrentHashMap) <<< time(allocating possibly large
> IoBuffer)
>
> Last get should have been avoided using the code you pointed though i think
> we must add
> a newValue.free() in the else before returning.
>
>
> IoBuffer oldValue = buffersMap.get(session);
> if (oldValue == null) {
> IoBuffer newValue = IoBuffer.allocate(bufferSize);
> oldValue = buffersMap.putIfAbsent(session, newValue);
> if (oldValue == null) { // means we put a new value
> oldValue = newValue;
> } else { // means we didn't put the new value but got the old value
> newValue.free();
> }
> }
>
> Yet, if large buffers and many writers threads are used, it will be costly
> in terms of
> memory used and time to allocate/deallocate unnecessary buffers.
>
> Why not using the Memoizer pattern ?
>
> /**
> * Memoizer
> * <p/>
> * Final implementation of Memoizer
> *
> * @author Brian Goetz and Tim Peierls
> */
> public class Memoizer <A, V> implements Computable<A, V> {
> private final ConcurrentMap<A, Future<V>> cache
> = new ConcurrentHashMap<A, Future<V>>();
> private final Computable<A, V> c;
>
> public Memoizer(Computable<A, V> c) {
> this.c = c;
> }
>
> public V compute(final A arg) throws InterruptedException {
> while (true) {
> Future<V> f = cache.get(arg);
> if (f == null) {
> Callable<V> eval = new Callable<V>() {
> public V call() throws InterruptedException {
> return c.compute(arg);
> }
> };
> FutureTask<V> ft = new FutureTask<V>(eval);
> f = cache.putIfAbsent(arg, ft);
> if (f == null) {
> f = ft;
> ft.run();
> }
> }
> try {
> return f.get();
> } catch (CancellationException e) {
> cache.remove(arg, f);
> } catch (ExecutionException e) {
> throw LaunderThrowable.launderThrowable(e.getCause());
> }
> }
> }
> }
>
> Using this pattern will prevent allocating unnecessary IoBuffer's and only
> uses two calls to the map in the worst case.
> This may seem overkill but could be worth it
>
> WDYT ?
>
> Cordialement, Regards,
> -Edouard De Oliveira-
> http://tedorg.free.fr/en/main.php
>
>
>
> ----- Message d'origine ----
> De : Sangjin Lee <[EMAIL PROTECTED]>
> À : [email protected]
> Envoyé le : Mercredi, 23 Juillet 2008, 2h25mn 14s
> Objet : Re: svn commit: r678238 - in
> /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> BufferedWriteFilter.java
>
> I'm not sure if BufferedWriteFilter.write() would be thread safe. It does
> three calls: get(), putIfAbsent(), and get(). Although these calls are
> thread safe individually, the compound operations are not necessarily
> thread
> safe.
>
> As far as I know, the pattern that always works is as follows:
>
> IoBuffer newValue = IoBuffer.allocate(bufferSize);
> IoBuffer oldValue = buffersMap.putIfAbsent(session, newValue);
> if (oldValue == null) { // means we put a new value
> return newValue;
> } else { // means we didn't put the new value but got the old value
> return oldValue;
> }
>
> But the obvious drawback of this is that you always have to allocate the
> buffer just to try putting it into the map. If we are not willing to incur
> that penalty, then we'll still need to do get() calls which may bring back
> the synchronization... Thoughts?
>
> Thanks,
> Sangjin
>
> On Tue, Jul 22, 2008 at 3:19 PM, Edouard De Oliveira <[EMAIL PROTECTED]>
> wrote:
>
> > So we agree on ConcurrentHashMap use : all of the conditions you wrote
> > should be met
> > when correctly using this filter. The uneasy parameter is MAX_THREADS. I
> > think there's
> > no magical value so as an API service provider we should provide default
> > implementation and let
> > the user provide its own params for the ConcurrentHashMap instantiation
> or
> > even its own
> > instance.
> >
> > I think adding the following constructor should close the debate :
> > public BufferedWriteFilter(int bufferSize, final ConcurrentMap<IoSession,
> > IoBuffer> buffersMap);
> >
> >
> > WDYT ?
> >
> > Cordialement, Regards,
> > -Edouard De Oliveira-
> > http://tedorg.free.fr/en/main.php
> >
> >
> >
> > ----- Message d'origine ----
> > De : Bogdan Ciprian Pistol <[EMAIL PROTECTED]>
> > À : [email protected]
> > Envoyé le : Mardi, 22 Juillet 2008, 12h41mn 46s
> > Objet : Re: Re : svn commit: r678238 - in
> > /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> > BufferedWriteFilter.java
> >
> > From my experience ConcurrentHashMap outperforms
> > Collections.synchronizedMap(...) only if:
> > - you know in advance how many maximum threads (MAX_THREADS) could
> > simultaneously modify the ConcurrentHashMap
> > - MAX_THREADS is small compared to the number of threads that are
> > accessing the ConcurrentHashMap (reads)
> > - the accesses (reads) vastly outnumbers the modifications (writes)
> > ConcurrentHashMap also consumes more memory (increases proportional
> > with MAX_THREADS)
> >
> > Bogdan
> >
> > On Tue, Jul 22, 2008 at 2:24 AM, Edouard De Oliveira
> > <[EMAIL PROTECTED]> wrote:
> > > Perfectly legitimate question Mark :
> > >
> > > Stop me if i'm not correct, but i think it would be an error using
> > CopyOnWriteMap in this usecase
> > > as the map grows when the number of sessions grows and as buffering
> takes
> > full meaning in long
> > > lived sessions, then each new session would duplicate a potentially
> > larger map hence
> > > hitting the bad side of CopyOnWriteMap.
> > >
> > >
> > > So ConcurrentHashMap should be more appropriate.
> > >
> > > WDYT ?
> > >
> > > ps: i'm commiting right now the proposed fix
> > > and a test class on trunk to ease any further review
> > >
> > > Cordialement, Regards,
> > > -Edouard De Oliveira-
> > > http://tedorg.free.fr/en/main.php
> > >
> > >
> > >
> > > ----- Message d'origine ----
> > > De : Mark Webb <[EMAIL PROTECTED]>
> > > À : [email protected]
> > > Envoyé le : Lundi, 21 Juillet 2008, 2h47mn 54s
> > > Objet : Re: svn commit: r678238 - in
> > /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer: ./
> > BufferedWriteFilter.java
> > >
> > > I have a question on this class.
> > >
> > > Why not use ConcurrentHashMap for this instead of synchronizing? Or
> > > CopyOnWriteMap?
> > >
> > >
> > >
> > >
> > > On Sat, Jul 19, 2008 at 7:19 PM, <[EMAIL PROTECTED]> wrote:
> > >
> > >> Author: edeoliveira
> > >> Date: Sat Jul 19 16:19:14 2008
> > >> New Revision: 678238
> > >>
> > >> URL: http://svn.apache.org/viewvc?rev=678238&view=rev
> > >> Log:
> > >> New IoFilter that implements DIRMINA-519
> > >>
> > >> Added:
> > >> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/
> > >>
> > >>
> >
>
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >> (with props)
> > >>
> > >> Added:
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >> URL:
> > >>
> >
> http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678238&view=auto
> > >>
> > >>
> >
> ==============================================================================
> > >> ---
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >> (added)
> > >> +++
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >> Sat Jul 19 16:19:14 2008
> > >> @@ -0,0 +1,243 @@
> > >> +/*
> > >> + * Licensed to the Apache Software Foundation (ASF) under one
> > >> + * or more contributor license agreements. See the NOTICE file
> > >> + * distributed with this work for additional information
> > >> + * regarding copyright ownership. The ASF licenses this file
> > >> + * to you under the Apache License, Version 2.0 (the
> > >> + * "License"); you may not use this file except in compliance
> > >> + * with the License. You may obtain a copy of the License at
> > >> + *
> > >> + * http://www.apache.org/licenses/LICENSE-2.0
> > >> + *
> > >> + * Unless required by applicable law or agreed to in writing,
> > >> + * software distributed under the License is distributed on an
> > >> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > >> + * KIND, either express or implied. See the License for the
> > >> + * specific language governing permissions and limitations
> > >> + * under the License.
> > >> + *
> > >> + */
> > >> +package org.apache.mina.filter.buffer;
> > >> +
> > >> +import java.io.BufferedOutputStream;
> > >> +import java.util.HashMap;
> > >> +import java.util.Map;
> > >> +
> > >> +import org.apache.mina.core.buffer.IoBuffer;
> > >> +import org.apache.mina.core.filterchain.IoFilter;
> > >> +import org.apache.mina.core.filterchain.IoFilterAdapter;
> > >> +import org.apache.mina.core.session.IoSession;
> > >> +import org.apache.mina.core.write.DefaultWriteRequest;
> > >> +import org.apache.mina.core.write.WriteRequest;
> > >> +import org.apache.mina.filter.codec.ProtocolCodecFilter;
> > >> +
> > >> +/**
> > >> + * An [EMAIL PROTECTED] IoFilter} implementation used to buffer
> > >> outgoing [EMAIL PROTECTED]
> > >> WriteRequest} almost
> > >> + * like what [EMAIL PROTECTED] BufferedOutputStream} does. Using this
> > >> filter
> > allows
> > >> to be less dependent
> > >> + * from network latency. It is also useful when a session is
> generating
> > >> very small messages
> > >> + * too frequently and consequently generating unnecessary traffic
> > >> overhead.
> > >> + *
> > >> + * Please note that it should always be placed before the [EMAIL
> > >> PROTECTED]
> > >> ProtocolCodecFilter}
> > >> + * as it only handles [EMAIL PROTECTED] WriteRequest}'s carrying [EMAIL
> > >> PROTECTED]
> IoBuffer}
> > >> objects.
> > >> + *
> > >> + * @author The Apache MINA Project ([email protected])
> > >> + * @version $Rev: $, $Date: $
> > >> + * @since MINA 2.0.0-M2
> > >> + */
> > >> +public final class BufferedWriteFilter extends IoFilterAdapter {
> > >> +
> > >> + /**
> > >> + * Default buffer size value in bytes.
> > >> + */
> > >> + public final static int DEFAULT_BUFFER_SIZE = 8192;
> > >> +
> > >> + /**
> > >> + * The buffer size allocated for each new session's buffer.
> > >> + */
> > >> + private int bufferSize = DEFAULT_BUFFER_SIZE;
> > >> +
> > >> + /**
> > >> + * The map that matches an [EMAIL PROTECTED] IoSession} and it's
> > >> [EMAIL PROTECTED]
> > IoBuffer}
> > >> + * buffer.
> > >> + */
> > >> + protected Map<IoSession, IoBuffer> buffersMap = new
> > HashMap<IoSession,
> > >> IoBuffer>();
> > >> +
> > >> + /**
> > >> + * Default constructor. Sets buffer size to [EMAIL PROTECTED]
> > >> #DEFAULT_BUFFER_SIZE}
> > >> + * bytes.
> > >> + */
> > >> + public BufferedWriteFilter() {
> > >> + this(DEFAULT_BUFFER_SIZE);
> > >> + }
> > >> +
> > >> + /**
> > >> + * Constructor which sets buffer size to <code>bufferSize</code>.
> > >> + *
> > >> + * @param bufferSize the new buffer size
> > >> + */
> > >> + public BufferedWriteFilter(int bufferSize) {
> > >> + super();
> > >> + this.bufferSize = bufferSize;
> > >> + }
> > >> +
> > >> + /**
> > >> + * Returns buffer size.
> > >> + */
> > >> + public int getBufferSize() {
> > >> + return bufferSize;
> > >> + }
> > >> +
> > >> + /**
> > >> + * Sets the buffer size but only for the newly created buffers.
> > >> + *
> > >> + * @param bufferSize the new buffer size
> > >> + */
> > >> + public void setBufferSize(int bufferSize) {
> > >> + this.bufferSize = bufferSize;
> > >> + }
> > >> +
> > >> + /**
> > >> + * [EMAIL PROTECTED]
> > >> + *
> > >> + * @throws Exception if <code>writeRequest.message</code> isn't
> an
> > >> + * [EMAIL PROTECTED] IoBuffer} instance.
> > >> + */
> > >> + @Override
> > >> + public void filterWrite(NextFilter nextFilter, IoSession session,
> > >> + WriteRequest writeRequest) throws Exception {
> > >> +
> > >> + Object data = writeRequest.getMessage();
> > >> +
> > >> + if (data instanceof IoBuffer) {
> > >> + write(session, (IoBuffer) data);
> > >> + } else {
> > >> + throw new IllegalArgumentException(
> > >> + "This filter should only buffer IoBuffer
> objects");
> > >> + }
> > >> + }
> > >> +
> > >> + /**
> > >> + * Writes an [EMAIL PROTECTED] IoBuffer} to the session's buffer.
> > >> + *
> > >> + * @param session the session to which a write is requested
> > >> + * @param data the data to buffer
> > >> + */
> > >> + private void write(IoSession session, IoBuffer data) {
> > >> + IoBuffer dest = null;
> > >> + synchronized (buffersMap) {
> > >> + dest = buffersMap.get(session);
> > >> + if (dest == null) {
> > >> + // Enforce the creation of a non-expandable buffer
> > >> + dest =
> > IoBuffer.allocate(bufferSize).setAutoExpand(false);
> > >> + buffersMap.put(session, dest);
> > >> + }
> > >> + }
> > >> +
> > >> + write(session, data, dest);
> > >> + }
> > >> +
> > >> + /**
> > >> + * Writes <code>data</code> [EMAIL PROTECTED] IoBuffer} to the
> > <code>buf</code>
> > >> + * [EMAIL PROTECTED] IoBuffer} which buffers write requests for the
> > >> + * <code>session</code> {@ link IoSession} until buffer is full
> > >> + * or manually flushed.
> > >> + *
> > >> + * @param session the session where buffer will be written
> > >> + * @param data the data to buffer
> > >> + * @param buf the buffer where data will be temporarily written
> > >> + */
> > >> + private void write(IoSession session, IoBuffer data, IoBuffer
> buf)
> > {
> > >> + synchronized (buf) {
> > >> + try {
> > >> + int len = data.remaining();
> > >> + if (len >= buf.capacity()) {
> > >> + /*
> > >> + * If the request length exceeds the size of the
> > >> output buffer,
> > >> + * flush the output buffer and then write the
> data
> > >> directly.
> > >> + */
> > >> + NextFilter nextFilter = session.getFilterChain()
> > >> + .getNextFilter(this);
> > >> + internalFlush(nextFilter, session, buf);
> > >> + nextFilter.filterWrite(session,
> > >> + new DefaultWriteRequest(buf));
> > >> + return;
> > >> + }
> > >> + if (len > (buf.limit() - buf.position())) {
> > >> +
> > >> internalFlush(session.getFilterChain().getNextFilter(this),
> > >> + session, buf);
> > >> + }
> > >> + buf.put(data);
> > >> + } catch (Throwable e) {
> > >> + session.getFilterChain().fireExceptionCaught(e);
> > >> + }
> > >> + }
> > >> + }
> > >> +
> > >> + /**
> > >> + * Internal method that actually flushes the buffered data.
> > >> + *
> > >> + * @param nextFilter the [EMAIL PROTECTED] NextFilter} of this
> > >> filter
> > >> + * @param session the session where buffer will be written
> > >> + * @param data the data to write
> > >> + * @throws Exception if a write operation fails
> > >> + */
> > >> + private void internalFlush(NextFilter nextFilter, IoSession
> > session,
> > >> + IoBuffer data) throws Exception {
> > >> + if (data != null) {
> > >> + nextFilter.filterWrite(session, new
> > >> DefaultWriteRequest(data));
> > >> + data.clear();
> > >> + }
> > >> + }
> > >> +
> > >> + /**
> > >> + * Flushes the buffered data.
> > >> + *
> > >> + * @param session the session where buffer will be written
> > >> + */
> > >> + public void flush(IoSession session) {
> > >> + try {
> > >> + IoBuffer data = null;
> > >> + synchronized (session) {
> > >> + data = buffersMap.get(session);
> > >> + }
> > >> +
> internalFlush(session.getFilterChain().getNextFilter(this),
> > >> + session, data);
> > >> +
> > >> + } catch (Throwable e) {
> > >> + session.getFilterChain().fireExceptionCaught(e);
> > >> + }
> > >> + }
> > >> +
> > >> + /**
> > >> + * Internal method that actually frees the [EMAIL PROTECTED]
> > >> IoBuffer} that
> > >> contains
> > >> + * the buffered data that has not been flushed.
> > >> + *
> > >> + * @param session the session we operate on
> > >> + */
> > >> + private void clear(IoSession session) {
> > >> + synchronized (session) {
> > >> + IoBuffer buf = buffersMap.remove(session);
> > >> + if (buf != null) {
> > >> + buf.free();
> > >> + }
> > >> + }
> > >> + }
> > >> +
> > >> + /**
> > >> + * [EMAIL PROTECTED]
> > >> + */
> > >> + @Override
> > >> + public void exceptionCaught(NextFilter nextFilter, IoSession
> > session,
> > >> + Throwable cause) throws Exception {
> > >> + clear(session);
> > >> + }
> > >> +
> > >> + /**
> > >> + * [EMAIL PROTECTED]
> > >> + */
> > >> + @Override
> > >> + public void sessionClosed(NextFilter nextFilter, IoSession
> session)
> > >> + throws Exception {
> > >> + clear(session);
> > >> + }
> > >> +}
> > >> \ No newline at end of file
> > >>
> > >> Propchange:
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >>
> > >>
> >
> ------------------------------------------------------------------------------
> > >> svn:eol-style = native
> > >>
> > >> Propchange:
> > >>
> >
> mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
> > >>
> > >>
> >
> ------------------------------------------------------------------------------
> > >> svn:mime-type = text/plain
> > >>
> > >>
> > >>
> > >
> > >
> > >
> > >
> >
> _____________________________________________________________________________
> > > Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> > http://mail.yahoo.fr
> > >
> >
> >
> >
> >
> >
> _____________________________________________________________________________
> > Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> > http://mail.yahoo.fr
> >
>
>
>
>
> _____________________________________________________________________________
> Envoyez avec Yahoo! Mail. Une boite mail plus intelligente
> http://mail.yahoo.fr
>