Author: woonsan
Date: Sun Nov 4 18:25:18 2007
New Revision: 591869
URL: http://svn.apache.org/viewvc?rev=591869&view=rev
Log:
[JS2-785] Parallel Rendering on Websphere 6.1
Fixed the the problem: using commonj work manager to render portlets, the
timeout option does not work.
Modified:
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java
Modified:
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
URL:
http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
---
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
(original)
+++
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
Sun Nov 4 18:25:18 2007
@@ -216,27 +216,7 @@
}
// synchronize on completion of all jobs
- iter = parallelJobs.iterator();
- try
- {
- while (iter.hasNext())
- {
- RenderingJob job = (RenderingJob) iter.next();
- PortletContent portletContent = job.getPortletContent();
-
- synchronized (portletContent)
- {
- if (!portletContent.isComplete())
- {
- portletContent.wait();
- }
- }
- }
- }
- catch (Exception e)
- {
- log.error("Exception during synchronizing all portlet rendering
jobs.", e);
- }
+ renderer.waitForRenderingJobs(parallelJobs);
// Now, restore it to non parallel mode for rendering layout portlets.
CurrentWorkerContext.setParallelRenderingMode(false);
Modified:
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
URL:
http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
---
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
(original)
+++
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
Sun Nov 4 18:25:18 2007
@@ -20,16 +20,22 @@
import java.security.AccessControlContext;
import java.security.AccessController;
import java.util.List;
-import java.util.LinkedList;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.jetspeed.aggregator.RenderingJob;
+import org.apache.jetspeed.aggregator.Worker;
import org.apache.jetspeed.aggregator.WorkerMonitor;
import org.apache.jetspeed.aggregator.PortletContent;
-import org.apache.jetspeed.aggregator.CurrentWorkerContext;
+
+import org.apache.pluto.om.window.PortletWindow;
+import org.apache.pluto.om.common.ObjectID;
import commonj.work.WorkManager;
import commonj.work.Work;
@@ -48,16 +54,36 @@
{
public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR =
AccessControlContext.class.getName();
+ public static final String COMMONJ_WORK_ITEM_ATTR =
WorkItem.class.getName();
+ public static final String WORKER_THREAD_ATTR = Worker.class.getName();
/** CommonJ Work Manamger provided by JavaEE container */
protected WorkManager workManager;
-
- /** Work items to be monitored for timeout checking */
- protected List workItemsMonitored = Collections.synchronizedList(new
LinkedList());
+ /** If true, invoke interrupt() on the worker thread when the job is
timeout. */
+ protected boolean interruptOnTimeout = true;
+
+ /** Enable rendering job works monitor thread for timeout checking */
+ protected boolean jobWorksMonitorEnabled = true;
+
+ /** Rendering job works to be monitored for timeout checking */
+ protected Map jobWorksMonitored = Collections.synchronizedMap(new
HashMap());
+
public CommonjWorkerMonitorImpl(WorkManager workManager)
{
+ this(workManager, true);
+ }
+
+ public CommonjWorkerMonitorImpl(WorkManager workManager, boolean
jobWorksMonitorEnabled)
+ {
+ this(workManager, jobWorksMonitorEnabled, true);
+ }
+
+ public CommonjWorkerMonitorImpl(WorkManager workManager, boolean
jobWorksMonitorEnabled, boolean interruptOnTimeout)
+ {
this.workManager = workManager;
+ this.jobWorksMonitorEnabled = jobWorksMonitorEnabled;
+ this.interruptOnTimeout = interruptOnTimeout;
}
/** Commons logging */
@@ -68,18 +94,21 @@
public void start()
{
- jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(1000);
- jobMonitor.start();
+ if (this.jobWorksMonitorEnabled)
+ {
+ jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(1000);
+ jobMonitor.start();
+ }
}
public void stop()
- {
- if (jobMonitor != null)
+ {
+ if (jobMonitor != null)
{
- jobMonitor.endThread();
+ jobMonitor.endThread();
}
-
- jobMonitor = null;
+
+ jobMonitor = null;
}
/**
@@ -95,8 +124,14 @@
try
{
- WorkItem workItem = this.workManager.schedule(new
RenderingJobCommonjWork(job), this);
- this.workItemsMonitored.add(workItem);
+ RenderingJobCommonjWork jobWork = new RenderingJobCommonjWork(job);
+ WorkItem workItem = this.workManager.schedule(jobWork, this);
+ job.setWorkerAttribute(COMMONJ_WORK_ITEM_ATTR, workItem);
+
+ if (this.jobWorksMonitorEnabled)
+ {
+ this.jobWorksMonitored.put(workItem, jobWork);
+ }
}
catch (Throwable t)
{
@@ -110,6 +145,61 @@
}
/**
+ * Wait for all rendering jobs in the collection to finish successfully or
otherwise.
+ * @param renderingJobs the Collection of rendering job objects to wait
for.
+ */
+ public void waitForRenderingJobs(List renderingJobs)
+ {
+ if (this.jobWorksMonitorEnabled)
+ {
+ try
+ {
+ for (Iterator iter = renderingJobs.iterator(); iter.hasNext();
)
+ {
+ RenderingJob job = (RenderingJob) iter.next();
+ PortletContent portletContent = job.getPortletContent();
+
+ synchronized (portletContent)
+ {
+ if (!portletContent.isComplete())
+ {
+ portletContent.wait();
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Exception during synchronizing all portlet
rendering jobs.", e);
+ }
+ }
+ else
+ {
+ // We cannot use WorkingManager#waitForAll(workitems, timeout_ms)
for timeout.
+ // The second argument could be either WorkManager.IMMEDIATE or
WorkManager.INDEFINITE.
+
+ try
+ {
+ if (!renderingJobs.isEmpty())
+ {
+ Object lock = new Object();
+ MonitoringJobCommonjWork monitoringWork = new
MonitoringJobCommonjWork(lock, renderingJobs);
+
+ synchronized (lock)
+ {
+ WorkItem monitorWorkItem =
this.workManager.schedule(monitoringWork, this);
+ lock.wait();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Exception during synchronizing all portlet
rendering jobs.", e);
+ }
+ }
+ }
+
+ /**
* Returns a snapshot of the available jobs
* @return available jobs
*/
@@ -123,6 +213,8 @@
return 0;
}
+ // commonj.work.WorkListener implementations
+
public void workAccepted(WorkEvent we)
{
WorkItem workItem = we.getWorkItem();
@@ -133,7 +225,11 @@
{
WorkItem workItem = we.getWorkItem();
if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl]
workRejected: " + workItem);
- removeMonitoredWorkItem(workItem);
+
+ if (this.jobWorksMonitorEnabled)
+ {
+ removeMonitoredJobWork(workItem);
+ }
}
public void workStarted(WorkEvent we)
@@ -146,12 +242,16 @@
{
WorkItem workItem = we.getWorkItem();
if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl]
workCompleted: " + workItem);
- removeMonitoredWorkItem(workItem);
+
+ if (this.jobWorksMonitorEnabled)
+ {
+ removeMonitoredJobWork(workItem);
+ }
}
- protected boolean removeMonitoredWorkItem(WorkItem workItem)
+ protected Object removeMonitoredJobWork(WorkItem workItem)
{
- return this.workItemsMonitored.remove(workItem);
+ return this.jobWorksMonitored.remove(workItem);
}
class RenderingJobCommonjWork implements Work
@@ -159,10 +259,6 @@
protected RenderingJob job;
- public RenderingJobCommonjWork()
- {
- }
-
public RenderingJobCommonjWork(RenderingJob job)
{
this.job = job;
@@ -175,6 +271,11 @@
public void run()
{
+ if (jobWorksMonitorEnabled || interruptOnTimeout)
+ {
+ this.job.setWorkerAttribute(WORKER_THREAD_ATTR,
Thread.currentThread());
+ }
+
this.job.run();
}
@@ -182,16 +283,106 @@
{
}
- public void setRenderingJob(RenderingJob job)
- {
- this.job = job;
- }
-
public RenderingJob getRenderingJob()
{
return this.job;
}
+ }
+
+ class MonitoringJobCommonjWork implements Work
+ {
+
+ protected Object lock;
+ protected List renderingJobs;
+ public MonitoringJobCommonjWork(Object lock, List jobs)
+ {
+ this.lock = lock;
+ this.renderingJobs = new ArrayList(jobs);
+ }
+
+ public boolean isDaemon()
+ {
+ return false;
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (!this.renderingJobs.isEmpty())
+ {
+ for (Iterator it = this.renderingJobs.iterator();
it.hasNext(); )
+ {
+ RenderingJob job = (RenderingJob) it.next();
+ WorkItem workItem = (WorkItem)
job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
+ int status = WorkEvent.WORK_ACCEPTED;
+
+ if (workItem != null)
+ {
+ status = workItem.getStatus();
+ }
+
+ boolean isTimeout = job.isTimeout();
+
+ if (isTimeout)
+ {
+ PortletContent content = job.getPortletContent();
+
+ if (interruptOnTimeout)
+ {
+ Thread worker = (Thread)
job.getWorkerAttribute(WORKER_THREAD_ATTR);
+
+ if (worker != null)
+ {
+ synchronized (content)
+ {
+ if (!content.isComplete()) {
+ worker.interrupt();
+ content.wait();
+ }
+ }
+ }
+ }
+ else
+ {
+ synchronized (content)
+ {
+ content.completeWithError();
+ }
+ }
+ }
+
+ if (status == WorkEvent.WORK_COMPLETED || status ==
WorkEvent.WORK_REJECTED || isTimeout)
+ {
+ it.remove();
+ }
+ }
+
+ if (!this.renderingJobs.isEmpty())
+ {
+ synchronized (this)
+ {
+ wait(100);
+ }
+ }
+ }
+
+ synchronized (this.lock)
+ {
+ this.lock.notify();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Exceptiong during job timeout monitoring.", e);
+ }
+ }
+
+ public void release()
+ {
+ }
+
}
class CommonjWorkerRenderingJobTimeoutMonitor extends Thread {
@@ -224,20 +415,30 @@
while (shouldRun) {
try
{
- synchronized (workItemsMonitored)
+ List timeoutJobWorks = new ArrayList();
+ Collection jobWorks = jobWorksMonitored.values();
+
+ for (Iterator it = jobWorks.iterator(); it.hasNext(); )
{
- for (Iterator it = workItemsMonitored.iterator();
it.hasNext(); )
+ RenderingJobCommonjWork jobWork =
(RenderingJobCommonjWork) it.next();
+ RenderingJob job = jobWork.getRenderingJob();
+
+ if (job.isTimeout())
{
- WorkItem workItem = (WorkItem) it.next();
- int status = workItem.getStatus();
-
- if (status == WorkEvent.WORK_COMPLETED || status
== WorkEvent.WORK_REJECTED)
- {
- it.remove();
- }
- else
- {
- }
+ timeoutJobWorks.add(jobWork);
+ }
+ }
+
+ // Now, we can kill the timeout worker(s).
+ for (Iterator it = timeoutJobWorks.iterator();
it.hasNext(); )
+ {
+ RenderingJobCommonjWork jobWork =
(RenderingJobCommonjWork) it.next();
+ RenderingJob job = jobWork.getRenderingJob();
+
+ // If the job is just completed, then do not kill the
worker.
+ if (job.isTimeout())
+ {
+ killJobWork(jobWork);
}
}
}
@@ -259,5 +460,42 @@
}
}
}
+
+ public void killJobWork(RenderingJobCommonjWork jobWork) {
+ RenderingJob job = jobWork.getRenderingJob();
+
+ try {
+ if (log.isWarnEnabled()) {
+ PortletWindow window = job.getWindow();
+ ObjectID windowId = (null != window ? window.getId() :
null);
+ log.warn("Portlet Rendering job to be interrupted by
timeout (" + job.getTimeout() + "ms): " + windowId);
+ }
+
+ PortletContent content = job.getPortletContent();
+ Thread worker = (Thread)
job.getWorkerAttribute(WORKER_THREAD_ATTR);
+
+ if (worker != null)
+ {
+ synchronized (content)
+ {
+ if (!content.isComplete()) {
+ worker.interrupt();
+ content.wait();
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exceptiong during job killing.", e);
+ } finally {
+ WorkItem workItem = (WorkItem)
job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
+
+ if (workItem != null)
+ {
+ removeMonitoredJobWork(workItem);
+ }
+ }
+ }
+
}
+
}
Modified:
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
URL:
http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
---
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
(original)
+++
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
Sun Nov 4 18:25:18 2007
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -380,6 +381,15 @@
log.error("render() failed: " + e1.toString(), e1);
fragment.overrideRenderedContent(e1.getLocalizedMessage());
}
+ }
+
+ /**
+ * Wait for all rendering jobs in the collection to finish successfully or
otherwise.
+ * @param renderingJobs the Collection of rendering job objects to wait
for.
+ */
+ public void waitForRenderingJobs(List renderingJobs)
+ {
+ this.workMonitor.waitForRenderingJobs(renderingJobs);
}
/**
Modified:
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
URL:
http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
---
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
(original)
+++
portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
Sun Nov 4 18:25:18 2007
@@ -204,6 +204,34 @@
}
}
}
+
+ /**
+ * Wait for all rendering jobs in the collection to finish successfully or
otherwise.
+ * @param renderingJobs the Collection of rendering job objects to wait
for.
+ */
+ public void waitForRenderingJobs(List renderingJobs)
+ {
+ try
+ {
+ for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
+ {
+ RenderingJob job = (RenderingJob) iter.next();
+ PortletContent portletContent = job.getPortletContent();
+
+ synchronized (portletContent)
+ {
+ if (!portletContent.isComplete())
+ {
+ portletContent.wait();
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Exception during synchronizing all portlet rendering
jobs.", e);
+ }
+ }
/**
* Put back the worker in the idle queue unless there are pending jobs and
Modified:
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
URL:
http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
---
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
(original)
+++
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
Sun Nov 4 18:25:18 2007
@@ -16,6 +16,8 @@
*/
package org.apache.jetspeed.aggregator;
+import java.util.List;
+
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -87,6 +89,12 @@
*/
public void processRenderingJob(RenderingJob job);
+ /**
+ * Wait for all rendering jobs in the collection to finish successfully or
otherwise.
+ * @param renderingJobs the Collection of rendering job objects to wait
for.
+ */
+ public void waitForRenderingJobs(List renderingJobs);
+
/**
* Retrieve the ContentDispatcher for the specified request
*/
Modified:
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java
URL:
http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
---
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java
(original)
+++
portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java
Sun Nov 4 18:25:18 2007
@@ -1,9 +1,9 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -16,6 +16,8 @@
*/
package org.apache.jetspeed.aggregator;
+import java.util.List;
+
import org.apache.jetspeed.util.Queue;
/**
@@ -67,4 +69,9 @@
*/
void process(RenderingJob job);
+ /**
+ * Wait for all rendering jobs in the collection to finish successfully or
otherwise.
+ * @param renderingJobs the Collection of rendering job objects to wait
for.
+ */
+ public void waitForRenderingJobs(List renderingJobs);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]