package org.vfny.geoserver.global;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.geotools.feature.FeatureCollection;

public class PrefetchingIterator implements Iterator {
    static final ExecutorService pool = Executors.newCachedThreadPool();
    BlockingQueue<ValueWrapper> queue;
    FeatureCollection collection;
    ValueWrapper current;
    Future task;
    boolean end = false;
    
    static int getDefaultDepth() {
        String depth = System.getProperty("QUEUE_DEPTH");
        try { 
            return Integer.parseInt(depth);
        } catch(NumberFormatException e) {
            return 10;
        }
    }
    
    public PrefetchingIterator(FeatureCollection collection) {
        this(collection, getDefaultDepth());
    }
    
    public PrefetchingIterator(FeatureCollection collection, int queueDepth) {
        this.collection = collection;
        queue = new ArrayBlockingQueue<ValueWrapper>(queueDepth);
        task = pool.submit(new Prefetcher());
    }

    public boolean hasNext() {
        if(end)
            return false;
        
        if(current == null) {
            try {
                current = queue.take();
            } catch(InterruptedException e) {
                // nothing to do
            }
            
            if(current.end) {
                end = true;
                return false;
            } else if(current.exception != null) {
                end = true;
                throw new RuntimeException("Error occurred during feature prefetching", current.exception);
            } else {
                return true;
            }
        } else {
            return true;
        }
    }

    public Object next() {
        if(!hasNext()) 
            throw new NoSuchElementException();
        
        Object result = current.value;
        current = null;
        return result;
    }
    
    public void close() {
        end = true;
        task.cancel(true);
        // make sure we unlock the prefetcher in case it's waiting
        queue.clear();
        queue = null;
    }

    public void remove() {
        throw new UnsupportedOperationException();
    }
    
    final class Prefetcher implements Runnable {

        public void run() {
            Iterator iterator = null;
            try {
                iterator = collection.iterator(); 
                while(iterator.hasNext()) {
                    if(end)
                        return;
                    putInQueue(new ValueWrapper(iterator.next()));
                }
            } catch(Exception e) {
                putInQueue(new ValueWrapper(e));
            } finally {
                collection.close(iterator);
            }
            putInQueue(new ValueWrapper());
        }
        
        void putInQueue(ValueWrapper o) {
            try {
                queue.put(o);
            } catch(InterruptedException e) {
                // nothing to do
            }
        }
        
    }
    
    static final class ValueWrapper {
        Object value;
        Exception exception;
        boolean end;
        
        public ValueWrapper(Exception e) {
            this.exception = e;
        }
        
        public ValueWrapper(Object value) {
            this.value = value;
        }
        
        public ValueWrapper() {
            this.end = true;
        }
    }

}
