/*
 * Patrick Sauts 2011 - @pathedog
 */

package org.apache.solr.client.solrj.impl;

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.httpclient.HttpClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * 
 */
public class BinaryStreamingUpdateSolrServer extends CommonsHttpSolrServer {

  /** */
	private static final long serialVersionUID = 8705073440395453595L;

	static final Logger LOG = LoggerFactory.getLogger( BinaryStreamingUpdateSolrServer.class );

  final BlockingQueue<List<SolrInputDocument>> queue;
  final ExecutorService scheduler = Executors.newCachedThreadPool();
  final Queue<Runner> runners;
  volatile CountDownLatch lock = null;  // used to block everything
  final int threadCount;
  final int bufferSize;
  ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
  final static int DEFAULT_DOCUMENT_LIST_SIZE = 20;
  final static UpdateResponse UR = new UpdateResponse();
	static CommonsHttpSolrServer baseServer;
  /**
   * Uses an internal MultiThreadedHttpConnectionManager to manage http connections
   *
   * @param solrServerUrl The Solr server URL
   * @param queueSize     The buffer size before the documents are sent to the server
   * @param threadCount   The number of background threads used to empty the queue
   * @throws MalformedURLException
   */
  public BinaryStreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount) throws MalformedURLException {
    this(solrServerUrl, null, queueSize, threadCount,DEFAULT_DOCUMENT_LIST_SIZE);
    
  }
  /**
   * Uses an internal MultiThreadedHttpConnectionManager to manage http connections
   *
   * @param solrServerUrl The Solr server URL
   * @param queueSize     The buffer size before the documents are sent to the server
   * @param threadCount   The number of background threads used to empty the queue
   * @param bufferSize    The number of docs to send at once
   * @throws MalformedURLException
   */
  public BinaryStreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount, int bufferSize) throws MalformedURLException {
  	this(solrServerUrl, null, queueSize, threadCount,bufferSize);
  }

  /**
   * Uses the supplied HttpClient to send documents to the Solr server, the HttpClient should be instantiated using a
   * MultiThreadedHttpConnectionManager.
   */
  public BinaryStreamingUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount, int bufferSize) throws MalformedURLException {
    super(solrServerUrl, client);
    queue = new LinkedBlockingQueue<List<SolrInputDocument>>(queueSize);
    this.threadCount = threadCount;
    runners = new LinkedList<Runner>();
    this.bufferSize=bufferSize;
    docs = new ArrayList<SolrInputDocument>(this.bufferSize);
    baseServer= (CommonsHttpSolrServer)this;
  }
  
  @Override
  public UpdateResponse addBeans(Collection<?> beans ) throws SolrServerException, IOException {
    for (Object bean : beans) {
    	addBean(bean);
    }
    return UR;
  }

  @Override
  public UpdateResponse addBean(Object obj) throws IOException, SolrServerException {
  	this.docs.add(getBinder().toSolrInputDocument(obj));
  	if(docs.size() >= bufferSize){
  		addToQueue(docs);
  		docs = new ArrayList<SolrInputDocument>(this.bufferSize);
  	}
    return UR;
  }

  private void addToQueue(List<SolrInputDocument> docs){
		try {
			if (!queue.offer(docs, 5, TimeUnit.MINUTES)) {
				LOG.error("Timeout inserting documents into queue (5 minutes ) - problem occured - exiting");
				throw new RuntimeException("Timeout inserting documents into queue (5 minutes )");
			}
			if (runners.isEmpty() || ((queue.size() > queue.remainingCapacity())  && runners.size() <  threadCount)) {
				synchronized (runners) {
					Runner r = new Runner();
					scheduler.execute(r);
					runners.add(r);
				}
			}
		} catch (InterruptedException e) {
			LOG.warn("Runner interupted");
		}
  	
  }
	/**
	 * The runners that write the data in solr...
	 */
	class Runner implements Runnable {

		final Lock runnerLock = new ReentrantLock();

		public void run() {
			runnerLock.lock();
			if (LOG.isDebugEnabled()) {
				LOG.debug("starting runner: " + this);
			}
			try {
				while (!queue.isEmpty()) {
					List<SolrInputDocument> docs = queue.poll(250, TimeUnit.MILLISECONDS);
					// between test and information retrieval queue might have been empty by another runner
					if (docs != null) {
						//Launch the update
						BinaryStreamingUpdateSolrServer.baseServer.add(docs);
					}
				}
			} catch (InterruptedException e) {
				LOG.warn("Asynchronous service indexer interupted (possibly data lose)", e);
			} catch (Throwable th) {
				LOG.warn("A problem has occured exiting runner " + this, th);
			}
			// remove it from the list of running things...
			synchronized (runners) {
				runners.remove(this);
			}
			if (LOG.isDebugEnabled()) {
				LOG.debug("stopping runner: " + this);
			}
			runnerLock.unlock();
		}
	}
  public synchronized void blockUntilFinished()
  {
  	flush();
    lock = new CountDownLatch(1);
    try {
      // Wait until no runners are running
      for(;;) {
        Runner runner;
        synchronized(runners) {
          runner = runners.peek();
        }
        if (runner == null) break;
        runner.runnerLock.lock();
        runner.runnerLock.unlock();
      }
    } finally {
      lock.countDown();
      lock=null;
    }
  }

  private void flush(){
  	if (CollectionUtils.isNotEmpty(docs)){
  		addToQueue(docs);
  		docs=null;
  	}
  }
}

