Author: dain
Date: Sat Aug 16 15:13:51 2008
New Revision: 686571

URL: http://svn.apache.org/viewvc?rev=686571&view=rev
Log:
Added SimpleWorkManager to support in bound resource adapters when running 
without a GeronimoTransactionManager.
SimpleWorkManager can handle all work manager requests except for importing an 
XID.

Added:
    
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleBootstrapContext.java
    
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleWorkManager.java
Modified:
    
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java

Modified: 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java?rev=686571&r1=686570&r2=686571&view=diff
==============================================================================
--- 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
 (original)
+++ 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
 Sat Aug 16 15:13:51 2008
@@ -48,11 +48,11 @@
 import javax.resource.spi.ManagedConnectionFactory;
 import javax.resource.spi.ResourceAdapter;
 import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.XATerminator;
 import javax.resource.spi.work.WorkManager;
 import javax.transaction.TransactionManager;
 import javax.transaction.TransactionSynchronizationRegistry;
 
-import org.apache.geronimo.connector.GeronimoBootstrapContext;
 import org.apache.geronimo.connector.work.GeronimoWorkManager;
 import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.openejb.BeanType;
@@ -70,6 +70,8 @@
 import org.apache.openejb.core.CoreContainerSystem;
 import org.apache.openejb.core.CoreDeploymentInfo;
 import org.apache.openejb.core.SimpleTransactionSynchronizationRegistry;
+import org.apache.openejb.core.transaction.SimpleWorkManager;
+import org.apache.openejb.core.transaction.SimpleBootstrapContext;
 import org.apache.openejb.core.ivm.naming.IvmContext;
 import org.apache.openejb.core.timer.EjbTimerServiceImpl;
 import org.apache.openejb.core.timer.NullEjbTimerServiceImpl;
@@ -183,7 +185,7 @@
         SystemInstance system = SystemInstance.get();
 
         system.setComponent(Assembler.class, this);
-
+        
         containerSystem = new CoreContainerSystem();
         system.setComponent(ContainerSystem.class, containerSystem);
 
@@ -225,7 +227,7 @@
 
     public static void installNaming() {
         if (System.getProperty(DUCT_TAPE_PROPERTY) != null) return;
-
+        
         /* Add IntraVM JNDI service /////////////////////*/
         Properties systemProperties = System.getProperties();
         synchronized (systemProperties) {
@@ -436,8 +438,8 @@
 
         logger.info("createApplication.start", appInfo.jarPath);
 
-        // To start out, ensure we don't already have any beans deployed with 
duplicate IDs.  This
-        // is a conflict we can't handle.
+        // To start out, ensure we don't already have any beans deployed with 
duplicate IDs.  This 
+        // is a conflict we can't handle. 
         List<String> used = new ArrayList<String>();
         for (EjbJarInfo ejbJarInfo : appInfo.ejbJars) {
             for (EnterpriseBeanInfo beanInfo : ejbJarInfo.enterpriseBeans) {
@@ -872,7 +874,7 @@
         // MDB container has a resource adapter string name that
         // must be replaced with the real resource adapter instance
         replaceResourceAdapterProperty(serviceRecipe);
-
+        
         Object service = serviceRecipe.create();
 
         logUnusedProperties(serviceRecipe, serviceInfo);
@@ -988,22 +990,32 @@
         if (service instanceof ResourceAdapter) {
             ResourceAdapter resourceAdapter = (ResourceAdapter) service;
 
-            // resource adapters only work with a geronimo transaction manager
-            if (!(transactionManager instanceof GeronimoTransactionManager)) {
-                throw new 
OpenEJBException(messages.format("assembler.requiresGeronimoTX"));
+            // Create a thead pool for work manager
+            int threadPoolSize = getIntProperty(serviceInfo.properties, 
"threadPoolSize", 30);
+            Executor threadPool;
+            if (threadPoolSize <= 0) {
+                threadPool = Executors.newCachedThreadPool(new 
ResourceAdapterThreadFactory(serviceInfo.id));
+            } else {
+                threadPool = Executors.newFixedThreadPool(threadPoolSize, new 
ResourceAdapterThreadFactory(serviceInfo.id));
             }
-            GeronimoTransactionManager geronimoTransactionManager = 
(GeronimoTransactionManager) transactionManager;
 
-            // create a thead pool
-            int threadPoolSize = getIntProperty(serviceInfo.properties, 
"threadPoolSize", 30);
-            if (threadPoolSize <= 0) throw new 
IllegalArgumentException("threadPoolSizes <= 0: " + threadPoolSize);
-            Executor threadPool = Executors.newFixedThreadPool(threadPoolSize, 
new ResourceAdapterThreadFactory(serviceInfo.id));
+            // WorkManager: the resource adapter can use this to dispatch 
messages or perform tasks
+            WorkManager workManager;
+            if (transactionManager instanceof GeronimoTransactionManager) {
+                GeronimoTransactionManager geronimoTransactionManager = 
(GeronimoTransactionManager) transactionManager;
+                workManager = new GeronimoWorkManager(threadPool, threadPool, 
threadPool, geronimoTransactionManager);
+            } else {
+                workManager = new SimpleWorkManager(threadPool);
+            }
 
-            // create a work manager which the resource adapter can use to 
dispatch messages or perform tasks
-            WorkManager workManager = new GeronimoWorkManager(threadPool, 
threadPool, threadPool, geronimoTransactionManager);
 
-            // wrap the work mananger and transaction manager in a bootstrap 
context (connector spec thing)
-            BootstrapContext bootstrapContext = new 
GeronimoBootstrapContext(workManager, geronimoTransactionManager);
+            // BootstrapContext: wraps the WorkMananger and XATerminator
+            BootstrapContext bootstrapContext;
+            if (transactionManager instanceof XATerminator) {
+                bootstrapContext = new SimpleBootstrapContext(workManager, 
(XATerminator) transactionManager);
+            } else {
+                bootstrapContext = new SimpleBootstrapContext(workManager);
+            }
 
             // start the resource adapter
             try {

Added: 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleBootstrapContext.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleBootstrapContext.java?rev=686571&view=auto
==============================================================================
--- 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleBootstrapContext.java
 (added)
+++ 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleBootstrapContext.java
 Sat Aug 16 15:13:51 2008
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.core.transaction;
+
+import java.util.Timer;
+import javax.resource.spi.work.WorkManager;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.BootstrapContext;
+
+public class SimpleBootstrapContext implements BootstrapContext {
+    private final WorkManager workManager;
+    private final XATerminator xaTerminator;
+
+    public SimpleBootstrapContext(WorkManager workManager) {
+        this.workManager = workManager;
+        xaTerminator = null;
+    }
+
+    public SimpleBootstrapContext(WorkManager workManager, XATerminator 
xaTerminator) {
+        this.workManager = workManager;
+        this.xaTerminator = xaTerminator;
+    }
+
+    public WorkManager getWorkManager() {
+        return workManager;
+    }
+
+    public XATerminator getXATerminator() {
+        return xaTerminator;
+    }
+
+    public Timer createTimer() {
+        return new Timer(true);
+    }
+}

Added: 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleWorkManager.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleWorkManager.java?rev=686571&view=auto
==============================================================================
--- 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleWorkManager.java
 (added)
+++ 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/core/transaction/SimpleWorkManager.java
 Sat Aug 16 15:13:51 2008
@@ -0,0 +1,251 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.core.transaction;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkAdapter;
+import javax.resource.spi.work.WorkCompletedException;
+import javax.resource.spi.work.WorkEvent;
+import static javax.resource.spi.work.WorkEvent.WORK_ACCEPTED;
+import static javax.resource.spi.work.WorkEvent.WORK_COMPLETED;
+import static javax.resource.spi.work.WorkEvent.WORK_REJECTED;
+import static javax.resource.spi.work.WorkEvent.WORK_STARTED;
+import javax.resource.spi.work.WorkException;
+import static javax.resource.spi.work.WorkException.INTERNAL;
+import static javax.resource.spi.work.WorkException.START_TIMED_OUT;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
+import javax.resource.spi.work.WorkRejectedException;
+
+import static 
org.apache.openejb.core.transaction.SimpleWorkManager.WorkType.DO;
+import static 
org.apache.openejb.core.transaction.SimpleWorkManager.WorkType.SCHEDULE;
+import static 
org.apache.openejb.core.transaction.SimpleWorkManager.WorkType.START;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
+
+public class SimpleWorkManager implements WorkManager {
+    public enum WorkType {
+        DO, START, SCHEDULE
+    }
+
+    private static final Logger logger = 
Logger.getInstance(LogCategory.OPENEJB, SimpleWorkManager.class);
+
+    /**
+     * All work is performed by this executor
+     */
+    private Executor executor;
+
+    public SimpleWorkManager(Executor executor) {
+        if (executor == null) throw new NullPointerException("executor is 
null");
+        this.executor = executor;
+    }
+
+    public void doWork(Work work) throws WorkException {
+        if (work == null) throw new NullPointerException("work is null");
+        doWork(work, INDEFINITE, null, null);
+    }
+
+    public void doWork(Work work, long startTimeout, ExecutionContext 
executionContext, WorkListener workListener) throws WorkException {
+        if (work == null) throw new NullPointerException("work is null");
+        executeWork(DO, work, startTimeout, executionContext, workListener);
+    }
+
+    public long startWork(Work work) throws WorkException {
+        if (work == null) throw new NullPointerException("work is null");
+        return startWork(work, INDEFINITE, null, null);
+    }
+
+    public long startWork(Work work, long startTimeout, ExecutionContext 
executionContext, WorkListener workListener) throws WorkException {
+        if (work == null) throw new NullPointerException("work is null");
+        return executeWork(START, work, startTimeout, executionContext, 
workListener);
+    }
+
+    public void scheduleWork(Work work) throws WorkException {
+        if (work == null) throw new NullPointerException("work is null");
+        scheduleWork(work, INDEFINITE, null, null);
+    }
+
+    public void scheduleWork(Work work, long startTimeout, ExecutionContext 
executionContext, WorkListener workListener) throws WorkException {
+        if (work == null) throw new NullPointerException("work is null");
+        executeWork(SCHEDULE, work, startTimeout, executionContext, 
workListener);
+    }
+
+    private long executeWork(WorkType workType, Work work, long startTimeout, 
ExecutionContext executionContext, WorkListener workListener) throws 
WorkException {
+        // assure we have a work listener
+        if (workListener == null) workListener = new 
LoggingWorkListener(workType);
+
+        // reject work with an XID
+        if (executionContext != null && executionContext.getXid() != null) {
+            WorkRejectedException workRejectedException = new 
WorkRejectedException("SimpleWorkManager can not import an XID", 
WorkException.TX_RECREATE_FAILED);
+            workListener.workRejected(new WorkEvent(this, WORK_REJECTED, work, 
workRejectedException));
+            throw workRejectedException;
+        }
+
+        // accecpt all other work
+        workListener.workAccepted(new WorkEvent(this, WORK_ACCEPTED, work, 
null));
+
+        // execute work
+        Worker worker = new Worker(work, workListener, startTimeout);
+        executor.execute(worker);
+
+        if (workType == DO) {
+            // wait for completion
+            try {
+                worker.waitForCompletion();
+            } catch (InterruptedException e) {
+                WorkException workException = new WorkException("Work 
submission thread was interrupted", e);
+                workException.setErrorCode(INTERNAL);
+                throw workException;
+            }
+
+            // if work threw an exception, rethrow it
+            WorkException workCompletedException = worker.getWorkException();
+            if (workCompletedException != null) {
+                throw workCompletedException;
+            }
+        } else if (workType == START) {
+            // wait for work to start
+            try {
+                worker.waitForStart();
+            } catch (InterruptedException e) {
+                WorkException workException = new WorkException("Work 
submission thread was interrupted", e);
+                workException.setErrorCode(INTERNAL);
+                throw workException;
+            }
+
+            // if work threw a rejection exception, rethrow it (it is the 
exception for timeout) 
+            WorkException workCompletedException = worker.getWorkException();
+            if (workCompletedException instanceof WorkRejectedException) {
+                throw workCompletedException;
+            }
+        }
+
+        return worker.getStartDelay();
+    }
+
+    private class Worker implements Runnable {
+        private final Work work;
+        private final WorkListener workListener;
+        private final long startTimeout;
+        private final long created = System.currentTimeMillis();
+        private final CountDownLatch started = new CountDownLatch(1);
+        private final CountDownLatch completed = new CountDownLatch(1);
+        private long startDelay = UNKNOWN;
+        private WorkException workException;
+
+        public Worker(Work work, WorkListener workListener, long startTimeout) 
{
+            this.work = work;
+            this.workListener = workListener;
+            if (startTimeout <= 0) {
+                this.startTimeout = INDEFINITE;
+            } else {
+                this.startTimeout = startTimeout;
+            }
+        }
+
+        public void run() {
+            try {
+                // check if we have started within the specified limit
+                startDelay = System.currentTimeMillis() - created;
+                if (startDelay > startTimeout) {
+                    workException = new WorkRejectedException("Work not 
started within specified timeout " + startTimeout + "ms", START_TIMED_OUT);
+                    workListener.workRejected(new WorkEvent(this, 
WORK_REJECTED, work, workException, startTimeout));
+                    return;
+                }
+
+                // notify listener that work has been started
+                workListener.workStarted(new WorkEvent(SimpleWorkManager.this, 
WORK_STARTED, work, null));
+
+                // officially started
+                started.countDown();
+
+                // execute the real work
+                workException = null;
+                try {
+                    work.run();
+                } catch (Throwable e) {
+                    workException = new WorkCompletedException(e);
+                } finally {
+                    // notify listener that work completed (with an optional 
exception)
+                    workListener.workCompleted(new 
WorkEvent(SimpleWorkManager.this, WORK_COMPLETED, work, workException));
+                }
+            } finally {
+                // assure that threads waiting for start are released
+                started.countDown();
+
+                // Done
+                completed.countDown();
+            }
+        }
+
+        public long getStartDelay() {
+            return startDelay;
+        }
+
+        public WorkException getWorkException() {
+            return workException;
+        }
+
+        public void waitForStart() throws InterruptedException {
+            started.await();
+        }
+
+        public void waitForCompletion() throws InterruptedException {
+            completed.await();
+        }
+    }
+
+    private static class LoggingWorkListener extends WorkAdapter {
+        private final WorkType workType;
+
+        private LoggingWorkListener(WorkType workType) {
+            this.workType = workType;
+        }
+
+        public void workRejected(WorkEvent event) {
+            // Don't log doWork or startWork since exception is propagated to 
caller
+            if (workType == DO || workType == START) {
+                return;
+            }
+            WorkException exception = event.getException();
+            if (exception != null) {
+                if 
(WorkException.START_TIMED_OUT.equals(exception.getErrorCode())) {
+                    logger.error(exception.getMessage());
+                }
+            }
+        }
+
+        public void workCompleted(WorkEvent event) {
+            // Don't log doWork since exception is propagated to caller
+            if (workType == DO) {
+                return;
+            }
+
+            Throwable cause = event.getException();
+            if (cause != null && cause.getCause() != null) {
+                cause = cause.getCause();
+            }
+            if (cause != null) {
+                logger.error(event.getWork().toString(), cause);
+            }
+        }
+    }
+}


Reply via email to