Author: trustin
Date: Wed Nov 7 21:35:34 2007
New Revision: 593015
URL: http://svn.apache.org/viewvc?rev=593015&view=rev
Log:
Related issue: DIRMINA-292 (Shared I/O processors.)
Forgot to check in SimpleIoProcessorPool
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
(with props)
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java?rev=593015&view=auto
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
(added)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
Wed Nov 7 21:35:34 2007
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An [EMAIL PROTECTED] IoProcessor} pool that distributes [EMAIL PROTECTED]
IoSession}s into one or more
+ * [EMAIL PROTECTED] IoProcessor}s. Most current transport implementations use
this pool internally
+ * to perform better in a multi-core environment, and therefore, you won't
need to
+ * use this pool directly unless you are running multiple [EMAIL PROTECTED]
IoService}s in the
+ * same JVM.
+ * <p>
+ * If you are running multiple [EMAIL PROTECTED] IoService}s, you could want
to share the pool
+ * among all services. To do so, you can create a new [EMAIL PROTECTED]
SimpleIoProcessorPool}
+ * instance by yourself and provide the pool as a constructor parameter when
you
+ * create the services.
+ * <p>
+ * This pool uses Java reflection API to create multiple [EMAIL PROTECTED]
IoProcessor} instances.
+ * It tries to instantiate the processor in the following order:
+ * <ol>
+ * <li>A public constructor with one [EMAIL PROTECTED] ExecutorService}
parameter.</li>
+ * <li>A public constructor with one [EMAIL PROTECTED] Executor}
parameter.</li>
+ * <li>A public default constructor</li>
+ * </ol>
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ *
+ * @param <T> the type of the [EMAIL PROTECTED] IoSession} to be managed by
the specified
+ * [EMAIL PROTECTED] IoProcessor}.
+ */
+public class SimpleIoProcessorPool<T extends AbstractIoSession> implements
IoProcessor<T> {
+
+ private static final int DEFAULT_SIZE =
Runtime.getRuntime().availableProcessors() + 1;
+ private static final AttributeKey PROCESSOR = new
AttributeKey(SimpleIoProcessorPool.class, "processor");
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final IoProcessor<T>[] pool;
+ private final AtomicInteger processorDistributor = new AtomicInteger();
+ private final Executor executor;
+ private final boolean createdExecutor;
+ private volatile boolean disposed;
+
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<T>>
processorType) {
+ this(processorType, null, DEFAULT_SIZE);
+ }
+
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<T>>
processorType, int size) {
+ this(processorType, null, size);
+ }
+
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<T>>
processorType, Executor executor) {
+ this(processorType, executor, DEFAULT_SIZE);
+ }
+
+ @SuppressWarnings("unchecked")
+ public SimpleIoProcessorPool(Class<? extends IoProcessor<T>>
processorType, Executor executor, int size) {
+ if (processorType == null) {
+ throw new NullPointerException("processorType");
+ }
+ if (size <= 0) {
+ throw new IllegalArgumentException(
+ "size: " + size + " (expected: positive integer)");
+ }
+
+ if (executor == null) {
+ this.executor = executor = Executors.newCachedThreadPool();
+ this.createdExecutor = true;
+ } else {
+ this.executor = executor;
+ this.createdExecutor = false;
+ }
+
+ pool = new IoProcessor[size];
+
+ boolean success = false;
+ try {
+ for (int i = 0; i < pool.length; i ++) {
+ IoProcessor<T> processor = null;
+
+ // Try to create a new processor with a proper constructor.
+ try {
+ try {
+ processor =
processorType.getConstructor(ExecutorService.class).newInstance(executor);
+ } catch (NoSuchMethodException e) {
+ // To the next step...
+ }
+
+ if (processor == null) {
+ try {
+ processor =
processorType.getConstructor(Executor.class).newInstance(executor);
+ } catch (NoSuchMethodException e) {
+ // To the next step...
+ }
+ }
+
+ if (processor == null) {
+ try {
+ processor =
processorType.getConstructor().newInstance();
+ } catch (NoSuchMethodException e) {
+ // To the next step...
+ }
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeIoException(
+ "Failed to create a new instance of " +
processorType.getName(), e);
+ }
+
+ // Raise an exception if no proper constructor is found.
+ if (processor == null) {
+ throw new IllegalArgumentException(
+ String.valueOf(processorType) + " must have a
public constructor " +
+ "with one " +
ExecutorService.class.getSimpleName() + " parameter, " +
+ "a public constructor with one " +
Executor.class.getSimpleName() +
+ " parameter or a public default constructor.");
+ }
+
+ pool[i] = processor;
+ }
+
+ success = true;
+ } finally {
+ if (!success) {
+ dispose();
+ }
+ }
+ }
+
+ public void add(T session) {
+ IoProcessor<T> p = nextProcessor();
+ session.setAttribute(PROCESSOR, p);
+ p.add(session);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void flush(T session) {
+ IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
+ p.flush(session);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void remove(T session) {
+ IoProcessor<T> p = (IoProcessor<T>) session.removeAttribute(PROCESSOR);
+ p.remove(session);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void updateTrafficMask(T session) {
+ IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
+ p.updateTrafficMask(session);
+ }
+
+ public void dispose() {
+ if (disposed) {
+ return;
+ }
+
+ disposed = true;
+ for (int i = pool.length - 1; i >= 0; i --) {
+ if (pool[i] == null) {
+ continue;
+ }
+
+ try {
+ pool[i].dispose();
+ } catch (Exception e) {
+ logger.warn(
+ "Failed to dispose a " +
pool[i].getClass().getSimpleName() +
+ " at index " + i + ".", e);
+ } finally {
+ pool[i] = null;
+ }
+ }
+
+ if (createdExecutor) {
+ ((ExecutorService) executor).shutdown();
+ }
+ }
+
+ private IoProcessor<T> nextProcessor() {
+ checkDisposal();
+ return pool[Math.abs(processorDistributor.getAndIncrement()) %
pool.length];
+ }
+
+ private void checkDisposal() {
+ if (disposed) {
+ throw new IllegalStateException("A disposed processor cannot be
accessed.");
+ }
+ }
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleIoProcessorPool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date