/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pdfbox.io;

import java.io.IOException;
import java.util.ArrayList;

/**
 * This class implements access to multiple {@link RandomAccess} instances using
 * a single {@link RandomAccessFile}. In the underlying file, data is stored in pages
 * of a fixed size. Each {@link RandomAccess} can have its own fixed page size;
 * this can be used to reduce disk usage in case of a large number of small data
 * streams.
 * 
 * @author Stefan Mücke
 */
public class PagedMultiRandomAccessFile {

	public static final int DEFAULT_PAGE_SIZE = 4096;

	private static class Page {
		long address;
		public Page(long address) {
			this.address = address;
		}
	}

	private class PagedRandomAccess implements RandomAccess {

		private int pageSize;
		private long size;
		private ArrayList<Page> pages = new ArrayList<Page>();
		/** The current position relative to this PagedRandomAccess. */
		private long pointer;

		/**
		 * Constructs a <code>PagedRandomAccess</code> with the specified page size. 
		 * 
		 * @param pageSize the size to used when allocating pages
		 */
		public PagedRandomAccess(int pageSize) {
			this.pageSize = pageSize;
		}

		/**
		 * {@inheritDoc}
		 */
		public long length() throws IOException {
			return size;
		}

		/**
		 * {@inheritDoc}
		 */
		public int read() throws IOException {
			if (pointer >= size) {
				return -1;
			}
			int pageIndex = (int) (pointer / pageSize);
			if (pageIndex > pages.size()) {
				return -1;
			}
			Page page = pages.get(pageIndex);
			long offset = pointer % pageSize; // offset into the page
			file.seek(page.address + offset);
			pointer++;
			return file.read();
		}

		/**
		 * {@inheritDoc}
		 */
		public int read(byte[] b, int offset, int length) throws IOException {
			if (pointer >= size) {
				return 0;
			}
			long oldPointer = pointer;
			int toRead = length;
			while (toRead > 0 && pointer < size) {
				int pageIndex = (int) (pointer / pageSize);
				if (pageIndex > pages.size()) {
					break;
				}
				Page page = pages.get(pageIndex);
				long pageOffset = pointer % pageSize; // offset into the page
				int read = (int) Math.min(toRead, pageSize - pageOffset);
				file.seek(page.address + pageOffset);
				file.read(b, offset, read);

				offset += read;
				pointer += read;
				toRead -= read;
			}
			return (int) (pointer - oldPointer);
		}

		/**
		 * {@inheritDoc}
		 */
		public void seek(long position) throws IOException {
			this.pointer = position;
		}

		/**
		 * {@inheritDoc}
		 */
		public void write(int b) throws IOException {
			ensureCapacity(pointer + 1);
			int pageIndex = (int) (pointer / pageSize);
			Page page = pages.get(pageIndex);
			long offset = pointer % pageSize; // offset into the page
			file.seek(page.address + offset);
			file.write(b);
			pointer++;
			size = pointer > size ? pointer : size;
		}

		/**
		 * {@inheritDoc}
		 */
		public void write(byte[] b, int offset, int length) throws IOException {
			ensureCapacity(pointer + length);
			int toWrite = length;
			while (toWrite > 0) {
				int pageIndex = (int) (pointer / pageSize);
				if (pageIndex > pages.size()) {
					break;
				}
				Page page = pages.get(pageIndex);
				long pageOffset = pointer % pageSize; // offset into the page
				int chunkSize = (int) Math.min(toWrite, pageSize - pageOffset);
				file.seek(page.address + pageOffset);
				file.write(b, offset, chunkSize);

				offset += chunkSize;
				pointer += chunkSize;
				toWrite -= chunkSize;
			}
			size = pointer > size ? pointer : size;
		}

		/**
		 * {@inheritDoc}
		 */
		public void close() throws IOException {
			// Underlying file must be closed when done with all 
		}

		private void ensureCapacity(long size) {
			if (size == 0)
				return;
			long required = size / pageSize + 1;
			long toAdd = required - pages.size();
			for (int i = 0; i < toAdd; i++) {
				Page page = allocatePage(pageSize);
				pages.add(page);
			}
		}
	}

	private final RandomAccessFile file;
	private ArrayList<PagedRandomAccess> randomAccessFiles = new ArrayList<PagedRandomAccess>();
	private long nextAddress;

	public PagedMultiRandomAccessFile(RandomAccessFile file) {
		this.file = file;
	}

	/**
	 * Returns the underlying {@link RandomAccessFile}.
	 * 
	 * @return the underlying {@link RandomAccessFile}
	 */
	public RandomAccessFile getFile() {
		return file;
	}

	/**
	 * Creates a new {@link RandomAccess} with the default page size.
	 *  
	 * @return the newly created {@link RandomAccess}
	 */
	public RandomAccess getNewRandomAcess() {
		return getNewRandomAcess(DEFAULT_PAGE_SIZE);
	}

	/**
	 * Creates a new {@link RandomAccess} with the specified page size.
	 *  
	 * @return the newly created {@link RandomAccess}
	 */
	public RandomAccess getNewRandomAcess(int pageSize) {
		PagedRandomAccess access = new PagedRandomAccess(pageSize);
		randomAccessFiles.add(access);
		return access;
	}

	private Page allocatePage(long size) {
		Page page = new Page(nextAddress);
		nextAddress += size;
		return page;
	}

	/**
	 * Closes the underlying {@link RandomAccessFile}.
	 * 
	 * @throws IOException
	 * @see #getFile()
	 */
	public void close() throws IOException {
		file.close();
	}

}
