Updated Branches: refs/heads/trunk 3b1034e82 -> c3a9c80ab
FLUME-2275. Improve scalability of MorphlineInterceptor under contention (Wolfgang Hoschek via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c3a9c80a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c3a9c80a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c3a9c80a Branch: refs/heads/trunk Commit: c3a9c80ab431f6ba670142c7ce6813692422f764 Parents: 3b1034e Author: Hari Shreedharan <[email protected]> Authored: Thu Jan 2 18:22:31 2014 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Jan 2 18:23:37 2014 -0800 ---------------------------------------------------------------------- .../solr/morphline/MorphlineInterceptor.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c3a9c80a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java index 8e5e4b3..ef8f716 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java @@ -23,18 +23,18 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.event.EventBuilder; import org.apache.flume.interceptor.Interceptor; - import org.kitesdk.morphline.api.Command; import org.kitesdk.morphline.api.Record; import org.kitesdk.morphline.base.Fields; + import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; @@ -47,7 +47,7 @@ import com.google.common.io.ByteStreams; public class MorphlineInterceptor implements Interceptor { private final Context context; - private final BlockingQueue<LocalMorphlineInterceptor> pool = new LinkedBlockingQueue(); + private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<LocalMorphlineInterceptor>(); protected MorphlineInterceptor(Context context) { Preconditions.checkNotNull(context); @@ -61,9 +61,8 @@ public class MorphlineInterceptor implements Interceptor { @Override public void close() { - List<LocalMorphlineInterceptor> interceptors = new ArrayList(); - pool.drainTo(interceptors); - for (LocalMorphlineInterceptor interceptor : interceptors) { + LocalMorphlineInterceptor interceptor; + while ((interceptor = pool.poll()) != null) { interceptor.close(); } } @@ -85,11 +84,7 @@ public class MorphlineInterceptor implements Interceptor { } private void returnToPool(LocalMorphlineInterceptor interceptor) { - try { - pool.put(interceptor); - } catch (InterruptedException e) { - throw new FlumeException(e); - } + pool.add(interceptor); } private LocalMorphlineInterceptor borrowFromPool() {
