Hi,
as you know I've been working on these idea of having
a feature iterator that does pre-fetch data over a separate
thread allowing some overlap between data reading and
data processing (be it gml encoding, rendering).
I've done various tests with them and whilst in some
cases there was a speedup, in most the speedup
was negligible or required a prefetch queue of a very
big size (10k elements).
The extra complexity introduced by the prefetching
is probably not worth the pain. Yet, I figure someone
might be interested in those?
I've attached the last working version of these
collections to this mail. Anyone interested in picking
them up?
Worst case the ml archive will back them up in case
someone wants to give them a shot.
Ah, I release them under the same license as GeoTools.
Cheers
Andrea
--
Andrea Aime
OpenGeo - http://opengeo.org
Expert service straight from the developers.
package org.vfny.geoserver.global;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.feature.collection.DecoratingFeatureCollection;
import org.opengis.feature.Feature;
import org.opengis.feature.type.FeatureType;
public class PrefetchingFeatureCollection<T extends FeatureType, F extends Feature> extends DecoratingFeatureCollection<T, F> {
private int queueDepth;
public PrefetchingFeatureCollection(FeatureCollection<T, F> delegate, int queueDepth) {
super(delegate);
this.queueDepth = queueDepth;
}
@Override
public FeatureIterator<F> features() {
return new PrefetchingFeatureIterator<F>(delegate, queueDepth);
}
@Override
public Iterator iterator() {
return new PrefetchingIterator(delegate, queueDepth);
}
@Override
public void close(FeatureIterator<F> close) {
close.close();
}
@Override
public void close(Iterator<F> close) {
((PrefetchingIterator) close).close();
}
static class PrefetchingFeatureIterator<F extends Feature> implements FeatureIterator<F> {
PrefetchingIterator iter;
public PrefetchingFeatureIterator(FeatureCollection collection, int queueDepth) {
this.iter = new PrefetchingIterator(collection, queueDepth);
}
public void close() {
iter.close();
}
public boolean hasNext() {
return iter.hasNext();
}
public F next() throws NoSuchElementException {
return (F) iter.next();
}
}
}
package org.vfny.geoserver.global;
import java.awt.RenderingHints.Key;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.geotools.data.DataAccess;
import org.geotools.data.FeatureListener;
import org.geotools.data.FeatureLock;
import org.geotools.data.FeatureLocking;
import org.geotools.data.FeatureReader;
import org.geotools.data.Query;
import org.geotools.data.QueryCapabilities;
import org.geotools.data.ResourceInfo;
import org.geotools.data.Transaction;
import org.geotools.feature.FeatureCollection;
import org.geotools.geometry.jts.ReferencedEnvelope;
import org.opengis.feature.Feature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.feature.type.FeatureType;
import org.opengis.feature.type.Name;
import org.opengis.filter.Filter;
import org.opengis.filter.identity.FeatureId;
public class PrefetchingFeatureLocking<T extends FeatureType, F extends Feature> implements
FeatureLocking<T, F> {
FeatureLocking<T, F> delegate;
int queueDepth;
public PrefetchingFeatureLocking(FeatureLocking<T, F> delegate, int queueDepth) {
this.delegate = delegate;
this.queueDepth = queueDepth;
}
public List<FeatureId> addFeatures(FeatureCollection<T, F> collection) throws IOException {
return delegate.addFeatures(collection);
}
public int lockFeatures() throws IOException {
return delegate.lockFeatures();
}
public int lockFeatures(Filter filter) throws IOException {
return delegate.lockFeatures(filter);
}
public int lockFeatures(Query query) throws IOException {
return delegate.lockFeatures(query);
}
public void setFeatureLock(FeatureLock lock) {
delegate.setFeatureLock(lock);
}
public void unLockFeatures() throws IOException {
delegate.unLockFeatures();
}
public void unLockFeatures(Filter filter) throws IOException {
delegate.unLockFeatures(filter);
}
public void unLockFeatures(Query query) throws IOException {
delegate.unLockFeatures(query);
}
public Transaction getTransaction() {
return delegate.getTransaction();
}
public void modifyFeatures(AttributeDescriptor type, Object value, Filter filter)
throws IOException {
delegate.modifyFeatures(type, value, filter);
}
public void modifyFeatures(AttributeDescriptor[] type, Object[] value, Filter filter)
throws IOException {
delegate.modifyFeatures(type, value, filter);
}
public void removeFeatures(Filter filter) throws IOException {
delegate.removeFeatures(filter);
}
public void setFeatures(FeatureReader<T, F> reader) throws IOException {
delegate.setFeatures(reader);
}
public void setTransaction(Transaction transaction) {
delegate.setTransaction(transaction);
}
public void addFeatureListener(FeatureListener listener) {
delegate.addFeatureListener(listener);
}
public ReferencedEnvelope getBounds() throws IOException {
return delegate.getBounds();
}
public ReferencedEnvelope getBounds(Query query) throws IOException {
return delegate.getBounds(query);
}
public int getCount(Query query) throws IOException {
return delegate.getCount(query);
}
public DataAccess<T, F> getDataStore() {
return delegate.getDataStore();
}
public FeatureCollection<T, F> getFeatures() throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(), queueDepth);
}
public FeatureCollection<T, F> getFeatures(Filter filter) throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(filter), queueDepth);
}
public FeatureCollection<T, F> getFeatures(Query query) throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(query), queueDepth);
}
public ResourceInfo getInfo() {
return delegate.getInfo();
}
public Name getName() {
return delegate.getName();
}
public QueryCapabilities getQueryCapabilities() {
return delegate.getQueryCapabilities();
}
public T getSchema() {
return delegate.getSchema();
}
public Set<Key> getSupportedHints() {
return delegate.getSupportedHints();
}
public void removeFeatureListener(FeatureListener listener) {
delegate.removeFeatureListener(listener);
}
}
package org.vfny.geoserver.global;
import java.awt.RenderingHints.Key;
import java.io.IOException;
import java.util.Set;
import org.geotools.data.DataAccess;
import org.geotools.data.FeatureListener;
import org.geotools.data.FeatureLocking;
import org.geotools.data.FeatureSource;
import org.geotools.data.FeatureStore;
import org.geotools.data.Query;
import org.geotools.data.QueryCapabilities;
import org.geotools.data.ResourceInfo;
import org.geotools.feature.FeatureCollection;
import org.geotools.geometry.jts.ReferencedEnvelope;
import org.opengis.feature.Feature;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.feature.type.FeatureType;
import org.opengis.feature.type.Name;
import org.opengis.filter.Filter;
import org.opengis.referencing.crs.CoordinateReferenceSystem;
public class PrefetchingFeatureSource<T extends FeatureType, F extends Feature> implements FeatureSource<T, F> {
FeatureSource<T, F> delegate;
int queueDepth;
public PrefetchingFeatureSource(FeatureSource<T, F> delegate, int queueDepth) {
this.delegate = delegate;
this.queueDepth = queueDepth;
}
public void addFeatureListener(FeatureListener listener) {
delegate.addFeatureListener(listener);
}
public ReferencedEnvelope getBounds() throws IOException {
return delegate.getBounds();
}
public ReferencedEnvelope getBounds(Query query) throws IOException {
return delegate.getBounds(query);
}
public int getCount(Query query) throws IOException {
return delegate.getCount(query);
}
public DataAccess<T, F> getDataStore() {
return delegate.getDataStore();
}
public FeatureCollection<T, F> getFeatures() throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(), queueDepth);
}
public FeatureCollection<T, F> getFeatures(Filter filter) throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(filter), queueDepth);
}
public FeatureCollection<T, F> getFeatures(Query query) throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(query), queueDepth);
}
public ResourceInfo getInfo() {
return delegate.getInfo();
}
public Name getName() {
return delegate.getName();
}
public QueryCapabilities getQueryCapabilities() {
return delegate.getQueryCapabilities();
}
public T getSchema() {
return delegate.getSchema();
}
public Set<Key> getSupportedHints() {
return delegate.getSupportedHints();
}
public void removeFeatureListener(FeatureListener listener) {
delegate.removeFeatureListener(listener);
}
public static <T extends FeatureType, F extends Feature>
FeatureSource<T, F> create(FeatureSource <T, F> featureSource, int queueDepth) {
if(queueDepth <= 1)
return featureSource;
if (featureSource instanceof FeatureLocking) {
return new PrefetchingFeatureLocking<T, F>(
(FeatureLocking<T, F>) featureSource, queueDepth);
} else if (featureSource instanceof FeatureStore) {
return new PrefetchingFeatureStore<T, F>(
(FeatureStore<T, F>) featureSource, queueDepth);
}
return new PrefetchingFeatureSource<T, F>(featureSource, queueDepth);
}
}
package org.vfny.geoserver.global;
import java.awt.RenderingHints.Key;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.geotools.data.DataAccess;
import org.geotools.data.FeatureListener;
import org.geotools.data.FeatureReader;
import org.geotools.data.FeatureSource;
import org.geotools.data.FeatureStore;
import org.geotools.data.Query;
import org.geotools.data.QueryCapabilities;
import org.geotools.data.ResourceInfo;
import org.geotools.data.Transaction;
import org.geotools.feature.FeatureCollection;
import org.geotools.geometry.jts.ReferencedEnvelope;
import org.opengis.feature.Feature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.feature.type.FeatureType;
import org.opengis.feature.type.Name;
import org.opengis.filter.Filter;
import org.opengis.filter.identity.FeatureId;
public class PrefetchingFeatureStore<T extends FeatureType, F extends Feature> implements
FeatureStore<T, F> {
FeatureStore<T, F> delegate;
int queueDepth;
public PrefetchingFeatureStore(FeatureStore<T, F> delegate, int queueDepth) {
this.delegate = delegate;
this.queueDepth = queueDepth;
}
public List<FeatureId> addFeatures(FeatureCollection<T, F> collection) throws IOException {
return delegate.addFeatures(collection);
}
public Transaction getTransaction() {
return delegate.getTransaction();
}
public void modifyFeatures(AttributeDescriptor type, Object value, Filter filter)
throws IOException {
delegate.modifyFeatures(type, value, filter);
}
public void modifyFeatures(AttributeDescriptor[] type, Object[] value, Filter filter)
throws IOException {
delegate.modifyFeatures(type, value, filter);
}
public void removeFeatures(Filter filter) throws IOException {
delegate.removeFeatures(filter);
}
public void setFeatures(FeatureReader<T, F> reader) throws IOException {
delegate.setFeatures(reader);
}
public void setTransaction(Transaction transaction) {
delegate.setTransaction(transaction);
}
public void addFeatureListener(FeatureListener listener) {
delegate.addFeatureListener(listener);
}
public ReferencedEnvelope getBounds() throws IOException {
return delegate.getBounds();
}
public ReferencedEnvelope getBounds(Query query) throws IOException {
return delegate.getBounds(query);
}
public int getCount(Query query) throws IOException {
return delegate.getCount(query);
}
public DataAccess<T, F> getDataStore() {
return delegate.getDataStore();
}
public FeatureCollection<T, F> getFeatures() throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(), queueDepth);
}
public FeatureCollection<T, F> getFeatures(Filter filter) throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(filter), queueDepth);
}
public FeatureCollection<T, F> getFeatures(Query query) throws IOException {
return new PrefetchingFeatureCollection<T, F>(delegate.getFeatures(query), queueDepth);
}
public ResourceInfo getInfo() {
return delegate.getInfo();
}
public Name getName() {
return delegate.getName();
}
public QueryCapabilities getQueryCapabilities() {
return delegate.getQueryCapabilities();
}
public T getSchema() {
return delegate.getSchema();
}
public Set<Key> getSupportedHints() {
return delegate.getSupportedHints();
}
public void removeFeatureListener(FeatureListener listener) {
delegate.removeFeatureListener(listener);
}
}
package org.vfny.geoserver.global;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.geotools.feature.FeatureCollection;
public class PrefetchingIterator implements Iterator {
static final ExecutorService pool = Executors.newCachedThreadPool();
BlockingQueue<ValueWrapper> queue;
FeatureCollection collection;
ValueWrapper current;
Future task;
boolean end = false;
static int getDefaultDepth() {
String depth = System.getProperty("QUEUE_DEPTH");
try {
return Integer.parseInt(depth);
} catch(NumberFormatException e) {
return 10;
}
}
public PrefetchingIterator(FeatureCollection collection) {
this(collection, getDefaultDepth());
}
public PrefetchingIterator(FeatureCollection collection, int queueDepth) {
this.collection = collection;
queue = new ArrayBlockingQueue<ValueWrapper>(queueDepth);
task = pool.submit(new Prefetcher());
}
public boolean hasNext() {
if(end)
return false;
if(current == null) {
try {
current = queue.take();
} catch(InterruptedException e) {
// nothing to do
}
if(current.end) {
end = true;
return false;
} else if(current.exception != null) {
end = true;
throw new RuntimeException("Error occurred during feature prefetching", current.exception);
} else {
return true;
}
} else {
return true;
}
}
public Object next() {
if(!hasNext())
throw new NoSuchElementException();
Object result = current.value;
current = null;
return result;
}
public void close() {
end = true;
task.cancel(true);
// make sure we unlock the prefetcher in case it's waiting
queue.clear();
queue = null;
}
public void remove() {
throw new UnsupportedOperationException();
}
final class Prefetcher implements Runnable {
public void run() {
Iterator iterator = null;
try {
iterator = collection.iterator();
while(iterator.hasNext()) {
if(end)
return;
putInQueue(new ValueWrapper(iterator.next()));
}
} catch(Exception e) {
putInQueue(new ValueWrapper(e));
} finally {
collection.close(iterator);
}
putInQueue(new ValueWrapper());
}
void putInQueue(ValueWrapper o) {
try {
queue.put(o);
} catch(InterruptedException e) {
// nothing to do
}
}
}
static final class ValueWrapper {
Object value;
Exception exception;
boolean end;
public ValueWrapper(Exception e) {
this.exception = e;
}
public ValueWrapper(Object value) {
this.value = value;
}
public ValueWrapper() {
this.end = true;
}
}
}
------------------------------------------------------------------------------
Come build with us! The BlackBerry® Developer Conference in SF, CA
is the only developer event you need to attend this year. Jumpstart your
developing skills, take BlackBerry mobile applications to market and stay
ahead of the curve. Join us from November 9-12, 2009. Register now!
http://p.sf.net/sfu/devconf
_______________________________________________
Geotools-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/geotools-devel