Author: desruisseaux Date: Fri Sep 1 04:51:04 2017 New Revision: 1806885 URL: http://svn.apache.org/viewvc?rev=1806885&view=rev Log: Change the internal working of StorageConnector in a way that track better the relationships between wrappers (e.g. InputStreamReader as a wrapper around InputStream).
Modified: sis/branches/JDK8/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java Modified: sis/branches/JDK8/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java URL: http://svn.apache.org/viewvc/sis/branches/JDK8/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java?rev=1806885&r1=1806884&r2=1806885&view=diff ============================================================================== --- sis/branches/JDK8/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java [UTF-8] (original) +++ sis/branches/JDK8/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java [UTF-8] Fri Sep 1 04:51:04 2017 @@ -17,12 +17,9 @@ package org.apache.sis.storage; import java.util.Map; -import java.util.Queue; import java.util.Iterator; import java.util.Collections; -import java.util.LinkedList; import java.util.IdentityHashMap; -import java.util.ConcurrentModificationException; import java.io.Reader; import java.io.DataInput; import java.io.InputStream; @@ -43,18 +40,15 @@ import org.apache.sis.util.Classes; import org.apache.sis.util.ArgumentChecks; import org.apache.sis.util.ObjectConverters; import org.apache.sis.util.resources.Errors; -import org.apache.sis.util.CorruptedObjectException; import org.apache.sis.internal.storage.Resources; import org.apache.sis.internal.storage.io.IOUtilities; import org.apache.sis.internal.storage.io.ChannelFactory; import org.apache.sis.internal.storage.io.ChannelDataInput; import org.apache.sis.internal.storage.io.ChannelImageInputStream; import org.apache.sis.internal.storage.io.InputStreamAdapter; +import org.apache.sis.internal.util.Utilities; import org.apache.sis.setup.OptionKey; -// Branch-dependent imports -import java.util.function.Consumer; - /** * Information for creating a connection to a {@link DataStore} in read and/or write mode. @@ -114,7 +108,8 @@ public class StorageConnector implements private interface Opener<T> { /** * Invoked when first needed for creating an input of the requested type. - * The enclosing {@link StorageConnector} is responsible for caching the result. + * This method should invoke {@link #addView(Class, Object, Class, boolean, boolean)} + * for caching the result before to return the view. */ T open(StorageConnector c) throws Exception; } @@ -128,7 +123,7 @@ public class StorageConnector implements * List of types recognized by {@link #getStorageAs(Class)}, associated to the methods for opening stream * of those types. This map shall contain every types documented in {@link #getStorageAs(Class)} javadoc. */ - private static final Map<Class<?>, Opener<?>> OPENERS = new IdentityHashMap<>(8); + private static final Map<Class<?>, Opener<?>> OPENERS = new IdentityHashMap<>(12); static { add(String.class, StorageConnector::createString); add(ByteBuffer.class, StorageConnector::createByteBuffer); @@ -138,6 +133,13 @@ public class StorageConnector implements add(Reader.class, StorageConnector::createReader); add(Connection.class, StorageConnector::createConnection); add(ChannelDataInput.class, (s) -> s.createChannelDataInput(false)); // Undocumented case (SIS internal) + add(ChannelFactory.class, (s) -> null); // Undocumented. Shall not cache. + /* + * ChannelFactory may have been created as a side effect of creating a ReadableByteChannel. + * Caller should have asked for another type (e.g. InputStream) before to ask for that type. + * Consequently null value for ChannelFactory shall not be cached since the actual value may + * be computed later. + */ } /** @@ -168,77 +170,271 @@ public class StorageConnector implements private Map<OptionKey<?>, Object> options; /** - * Views of {@link #storage} as some of the following supported types: - * + * Views of {@link #storage} as instances of different types than the type of the object given to the constructor. + * The {@code null} reference can appear in various places: * <ul> - * <li>{@link ByteBuffer}: - * A read-only view of the buffer over the first bytes of the stream.</li> - * - * <li>{@link DataInput}: - * The input as a data input stream. Unless the {@link #storage} is already an instance of {@link DataInput}, - * this entry will be given an instance of {@link ChannelImageInputStream} if possible rather than an arbitrary - * stream. In particular, we invoke the {@link ImageIO#createImageInputStream(Object)} factory method only in - * last resort because some SIS data stores will want to access the channel and buffer directly.</li> - * - * <li>{@link ImageInputStream}: - * Same as {@code DataInput} if it can be casted, or {@code null} otherwise.</li> - * - * <li>{@link InputStream}: - * If not explicitely provided, this is a wrapper around the above {@link ImageInputStream}.</li> - * - * <li>{@link Reader}: - * If not explicitely provided, this is a wrapper around the above {@link InputStream}.</li> - * - * <li>{@link Connection}: - * The storage object as a JDBC connection.</li> + * <li>A non-existent entry (equivalent to an entry associated to the {@code null} value) means that the value + * has not yet been computed.</li> + * <li>A {@linkplain Coupled#isValid valid entry} with {@link Coupled#view} set to {@code null} means the value + * has been computed and we have determined that {@link #getStorageAs(Class)} shall return {@code null} for + * that type.</li> + * <li>By convention, the {@code null} key is associated to the {@link #storage} value.</li> * </ul> * - * A non-existent entry means that the value has not yet been computed. A {@link Void#TYPE} value means the value - * has been computed and we have determined that {@link #getStorageAs(Class)} shall returns {@code null} for that - * type. - * + * @see #addView(Class, Object, Class, boolean, boolean) + * @see #getView(Class) * @see #getStorageAs(Class) */ - private transient Map<Class<?>, Object> views; + private transient Map<Class<?>, Coupled> views; /** - * The views that need to be synchronized if {@link #storage} is used independently. They are views - * that may advance {@code storage} position, but not necessarily in same time than the view position - * (typically because the view reads some bytes in advance and stores them in a buffer). This map may - * be non-null when the storage is an {@link InputStream}, {@link java.io.OutputStream} or a - * {@link java.nio.channels.Channel}. Those views can be: + * Wraps an instance of @link InputStream}, {@link DataInput}, {@link Reader}, <i>etc.</i> together with additional + * information about other objects that are coupled with the wrapped object. For example if a {@link Reader} is a + * wrapper around the user-supplied {@link InputStream}, then those two objects will be wrapped in {@code Coupled} + * instances together with information about how they are related + * + * One purpose of {@code Coupled} information is to keep trace of objects which will need to be closed by the + * {@link StorageConnector#closeAllExcept(Object)} method (for example an {@link InputStreamReader} wrapping + * an {@link InputStream}). + * + * Another purpose is to determine which views need to be synchronized if {@link StorageConnector#storage} is + * used independently. They are views that may advance {@code storage} position, but not in same time than the + * {@link #view} position (typically because the view reads some bytes in advance and stores them in a buffer). + * Such coupling may occur when the storage is an {@link InputStream}, an {@link java.io.OutputStream} or a + * {@link java.nio.channels.Channel}. The coupled {@link #view} can be: * * <ul> * <li>{@link Reader} that are wrappers around {@code InputStream}.</li> * <li>{@link ChannelDataInput} when the channel come from an {@code InputStream}.</li> * <li>{@link ChannelDataInput} when the channel has been explicitely given to the constructor.</li> * </ul> - * - * Note that we do <strong>not</strong> include {@link InputStreamAdapter} because it does not use buffer; - * {@code InputStreamAdapter} positions are synchronized with wrapped {@link ImageInputStream} positions. - * - * <p>Values are cleanup actions to execute after the {@link #storage} has been reseted to its original position. - * A {@code null} value means that the view can not be synchronized and consequently should be discarded.</p> */ - private transient Map<Class<?>, Consumer<StorageConnector>> viewsToSync; + private static final class Coupled { + /** + * The {@link StorageConnector#storage} viewed as another kind of object. + * Supported types are: + * + * <ul> + * <li>{@link ByteBuffer}: + * A read-only view of the buffer over the first bytes of the stream.</li> + * + * <li>{@link DataInput}: + * The input as a data input stream. Unless the {@link #storage} is already an instance of {@link DataInput}, + * this entry will be given an instance of {@link ChannelImageInputStream} if possible rather than an arbitrary + * stream. In particular, we invoke the {@link ImageIO#createImageInputStream(Object)} factory method only in + * last resort because some SIS data stores will want to access the channel and buffer directly.</li> + * + * <li>{@link ImageInputStream}: + * Same as {@code DataInput} if it can be casted, or {@code null} otherwise.</li> + * + * <li>{@link InputStream}: + * If not explicitely provided, this is a wrapper around the above {@link ImageInputStream}.</li> + * + * <li>{@link Reader}: + * If not explicitely provided, this is a wrapper around the above {@link InputStream}.</li> + * + * <li>{@link Connection}: + * The storage object as a JDBC connection.</li> + * </ul> + */ + Object view; - /** - * Objects which will need to be closed by the {@link #closeAllExcept(Object)} method. - * For each (<var>key</var>, <var>value</var>) entry, if the object to close (the key) - * is a wrapper around an other object (e.g. an {@link InputStreamReader} wrapping an - * {@link InputStream}), then the value is the other object. - * - * @see #addViewToClose(Object, Object) - * @see #closeAllExcept(Object) - */ - private transient Map<Object, Object> viewsToClose; + /** + * The object that {@link #view} is wrapping. For example if {@code view} is an {@link InputStreamReader}, + * then {@code wrapperFor.view} is an {@link InputStream}. This field is {@code null} if {@link #view} == + * {@link StorageConnector#storage}. + */ + final Coupled wrapperFor; - /** - * The view returned by the last call to {@link #getStorageAs(Class)}. - * - * @see #storage - */ - private transient Object lastView; + /** + * The other views that are consuming {@link #view}, or {@code null} if none. For each element in this array, + * {@code wrappedBy[i].wrapperFor == this}. + */ + private Coupled[] wrappedBy; + + /** + * {@code true} if calls to {@link #reset()} should cascade to {@link #wrapperFor}. + * This field can be {@code false} if any change in the position of {@link #view} + * is immediately reflected in the position of {@link #wrapperFor}, and vis-versa. + */ + final boolean cascadeOnReset; + + /** + * {@code true} if after closing the {@link #view}, we need to also close the {@link #wrapperFor}. + * This field should be {@code true} when the view is an {@link ImageInputStream} because Java I/O + * {@link javax.imageio.stream.FileCacheImageInputStream#close()} does not close the underlying stream. + * For most other kinds of view, this field should be {@code false}. + */ + final boolean cascadeOnClose; + + /** + * {@code true} if the position of {@link #view} is synchronized with the position of {@link #wrapperFor}. + */ + boolean isValid; + + /** + * Creates a wrapper for {@link StorageConnector#storage}. This constructor is used when we need to create + * a {@code Coupled} instance for another view wrapping {@code storage}. + */ + Coupled(final Object storage) { + view = storage; + wrapperFor = null; + cascadeOnReset = false; + cascadeOnClose = false; + isValid = true; + } + + /** + * Creates a wrapper for a view wrapping the given {@code Coupled} instance. + * Caller is responsible to set the {@link #view} field after this constructor call. + * + * @param wrapperFor the object that {@link #view} will wrap, or {@code null} if none. + * @param cascadeOnReset whether calls to {@link #reset()} shall cascade. + * @param cascadeOnClose whether calls to {@link AutoCloseable#close()} shall cascade. + */ + @SuppressWarnings("ThisEscapedInObjectConstruction") + Coupled(final Coupled wrapperFor, final boolean cascadeOnReset, final boolean cascadeOnClose) { + this.wrapperFor = wrapperFor; + this.cascadeOnReset = cascadeOnReset; + this.cascadeOnClose = cascadeOnClose; + if (wrapperFor != null) { + final Coupled[] w = wrapperFor.wrappedBy; + final int n = (w != null) ? w.length : 0; + final Coupled[] e = new Coupled[n + 1]; + if (n != 0) System.arraycopy(w, 0, e, 0, n); + e[n] = this; + wrapperFor.wrappedBy = e; + } + } + + /** + * Declares as invalid all unsynchronized {@code Coupled} instances which are used, directly or indirectly, + * by this instance. This method is invoked before {@link StorageConnector#getStorageAs(Class)} returns a + * view, in order to remember which views would need to be resynchronized if they are requested. + */ + final void invalidateSources() { + boolean sync = cascadeOnReset; + for (Coupled c = wrapperFor; sync; c = c.wrapperFor) { + c.isValid = false; + sync = c.cascadeOnReset; + } + } + + /** + * Declares as invalid all unsynchronized {@code Coupled} instances which are using, directly or indirectly, + * this instance. This method is invoked before {@link StorageConnector#getStorageAs(Class)} returns a view, + * in order to remember which views would need to be resynchronized if they are requested. + */ + final void invalidateUsages() { + if (wrappedBy != null) { + for (final Coupled c : wrappedBy) { + if (c.cascadeOnReset) { + c.isValid = false; + c.invalidateUsages(); + } + } + } + } + + /** + * Identifies the other views to <strong>not</strong> close if we don't want to close the {@link #view} + * wrapped by this {@code Coupled}. This method identifies only the views that <em>use</em> this view; + * it does not identify the views <em>used</em> by this view. + * + * This method is for {@link StorageConnector#closeAllExcept(Object)} internal usage. + * + * @param toClose the map where to write the list of views to not close. + */ + final void protect(final Map<AutoCloseable,Boolean> toClose) { + if (wrappedBy != null) { + for (final Coupled c : wrappedBy) { + if (!c.cascadeOnClose) { + if (c.view instanceof AutoCloseable) { + toClose.put((AutoCloseable) c.view, Boolean.FALSE); + } + c.protect(toClose); + } + } + } + } + + /** + * Resets the position of all sources of the {@link #view}, then the view itself. + * + * @return {@code true} if some kind of reset has been performed. + * Note that it does means that the view {@link #isValid} is {@code true}. + */ + final boolean reset() throws Exception { + if (isValid) { + return false; + } + isValid = true; // Set first as a safety against infinite recursivity. + /* + * We need to reset the sources before to reset the view of this Coupled instance. + * For example if this Coupled instance contains a ChannelDataInput, we need to + * reset the underlying InputStream before to reset the ChannelDataInput. + */ + if (cascadeOnReset) { + wrapperFor.reset(); + } + if (view instanceof InputStream) { + /* + * Note on InputStream.reset() behavior documented in java.io: + * + * - It does not discard the mark, so it is okay if reset() is invoked twice. + * - If mark is unsupported, may either throw IOException or reset the stream + * to an implementation-dependent fixed state. + */ + ((InputStream) view).reset(); + } else if (view instanceof Reader) { + ((Reader) view).reset(); + } else if (view instanceof SeekableByteChannel) { + /* + * This case should be rare. If it happen anyway, searches for a ChannelDataInput wrapping + * the channel, because it contains the original position (note: StorageConnector tries to + * instantiate ChannelDataInput in priority to all other types). If we don't find any, the + * original position is assumed to be zero (which is the case most of the time). + */ + long p = 0; + if (wrappedBy != null) { + for (Coupled c : wrappedBy) { + if (c.view instanceof ChannelDataInput) { + p = ((ChannelDataInput) c.view).channelOffset; + break; + } + } + } + ((SeekableByteChannel) view).position(p); + } else if (view instanceof ChannelDataInput) { + /* + * ChannelDataInput can be recycled without the need to discard and recreate them. + */ + final ChannelDataInput input = (ChannelDataInput) view; + input.buffer.limit(0); // Must be after stream.reset(). + input.setStreamPosition(0); // Must be after buffer.limit(0). + } else { + /* + * For any other kind of object, we don't know how to recycle them. Current implementation + * does nothing on the assumption that the object can be reused (example: NetcdfFile). + */ + } + return true; + } + + /** + * Returns a string representation for debugging purpose. + */ + @Debug + @Override + public String toString() { + return Utilities.toString(getClass(), + "view", Classes.getShortClassName(view), + "wrapperFor", (wrapperFor != null) ? Classes.getShortClassName(wrapperFor.view) : null, + "cascadeOnReset", cascadeOnReset, + "cascadeOnClose", cascadeOnClose, + "isValid", isValid); + } + } /** * Creates a new data store connection wrapping the given input/output object. @@ -287,15 +483,12 @@ public class StorageConnector implements * The object can be of any type, but the class javadoc lists the most typical ones. * * @return the input/output object as a URL, file, image input stream, <i>etc.</i>. - * @throws DataStoreException if an error occurred while reseting the input stream or channel to its original position. + * @throws DataStoreException if the storage object has already been used and can not be reused. * * @see #getStorageAs(Class) */ public Object getStorage() throws DataStoreException { - if (viewsToSync != null && storage != lastView) { - resetStorage(); - } - lastView = storage; + reset(); return storage; } @@ -448,75 +641,83 @@ public class StorageConnector implements ArgumentChecks.ensureNonNull("type", type); /* * Verify if the cache contains an instance created by a previous invocation of this method. - * Note that InputStream may need to be reset if it has been used indirectly by other kind - * of stream (for example a java.io.Reader); this will be done at the end of this method. + * Note that InputStream may need to be reseted if it has been used indirectly by other kind + * of stream (for example a java.io.Reader). Example: + * + * 1) The storage specified at construction time is a java.nio.file.Path. + * 2) getStorageAs(InputStream.class) opens an InputStream. Caller rewinds it after use. + * 3) getStorageAs(Reader.class) wraps the InputStream. Caller rewinds the Reader after use, + * but invoking BufferedReader.reset() has no effect on the underlying InputStream. + * 4) getStorageAs(InputStream.class) needs to rewind the InputStream itself since it was + * not done at step 3. However doing so invalidate the Reader, so we need to discard it. */ - Object value; - final boolean cache; - if (views != null && (value = views.get(type)) != null) { - if (value == Void.TYPE) return null; - cache = false; - } else { - cache = true; - value = storage; - if (!type.isInstance(value)) { - /* - * No instance has been created previously for the requested type. Open the stream now, - * then cache it for future reuse. Note that we may cache 'null' value if no stream of - * the given type can be created. - */ - final Opener<?> method = OPENERS.get(type); - if (method != null) try { - value = method.open(this); - } catch (RuntimeException | DataStoreException e) { - throw e; - } catch (Exception e) { - throw new DataStoreException(Errors.format(Errors.Keys.CanNotOpen_1, getStorageName()), e); - } else if (type == ChannelFactory.class) { // Undocumented case (SIS internal). - /* - * ChannelFactory may have been created as a side effect of creating a ReadableByteChannel. - * Caller should have asked for another type (e.g. InputStream) before to ask for this type. - */ - return null; // Do not cache since the instance may be created later. - } else { - /* - * If the type is not one of the types listed in OPENERS, we delegate to ObjectConverter. - * It will throw UnconvertibleObjectException (an IllegalArgumentException subtype) if - * the given type is unrecognized. - */ - value = ObjectConverters.convert(storage, type); - } - } + Coupled value = getView(type); + if (reset(value)) { + return type.cast(value.view); // null is a valid result. } /* - * If the user asked an InputStream, we may return the storage as-is if it was already an InputStream. - * However before doing so, we may need to reset the InputStream position if the stream has been used - * by a ChannelDataInput or an InputStreamReader. + * If the storage is already an instance of the requested type, returns the storage as-is. + * We check if the storage needs to be reseted in the same way than in getStorage() method. */ - final T view = type.cast(value); - if (viewsToSync != null && view != lastView && (view == storage || viewsToSync.containsKey(type))) { - resetStorage(); + if (type.isInstance(storage)) { + @SuppressWarnings("unchecked") + final T view = (T) storage; + reset(); + addView(type, view); + return view; } - if (cache) { - addView(type, view); // Shall be after 'resetStorage()'. + /* + * If the type is not one of the types listed in OPENERS, we delegate to ObjectConverter. + * It may throw UnconvertibleObjectException (an IllegalArgumentException subtype) if the + * given type is unrecognized. So the IllegalArgumentException documented in method javadoc + * happen (indirectly) here. + */ + final Opener<?> method = OPENERS.get(type); + if (method == null) { + final T view = ObjectConverters.convert(storage, type); + addView(type, view); + return view; } - lastView = view; - return view; + /* + * No instance has been created previously for the requested type. Open the stream now. + * Some types will need to reset the InputStream or Channel, but the decision of doing + * so or not is left to openers. Result will be cached by the 'createFoo()' method. + * Note that it may cache 'null' value if no stream of the given type can be created. + */ + final Object view; + try { + view = method.open(this); + } catch (DataStoreException e) { + throw e; + } catch (Exception e) { + throw new DataStoreException(Errors.format(Errors.Keys.CanNotOpen_1, getStorageName()), e); + } + return type.cast(view); } /** - * Mark the storage position before to create a view that may be a wrapper around that storage. + * Marks the current position of the given view. This method should be invoked at the beginning of + * {@code createFoo()}} methods that may wrap {@link InputStream} in {@link InputStreamReader} or + * other objects. It is caller's responsibility to {@link #reset(Coupled)} first, if needed. */ - private void markStorage() { - if (storage instanceof InputStream) { - ((InputStream) storage).mark(DEFAULT_BUFFER_SIZE); + private static void mark(final Object view) throws IOException { + if (view instanceof InputStream) { + ((InputStream) view).mark(DEFAULT_BUFFER_SIZE); + } else if (view instanceof Reader) { + ((Reader) view).mark(DEFAULT_BUFFER_SIZE); + } else if (view instanceof ReadableByteChannel) { + // TODO } } /** - * Assuming that {@link #storage} is an instance of {@link InputStream}, {@link ReadableByteChannel} or other - * objects that may be affected by views operations, resets the storage position. This method is the converse - * of {@link #markStorage()}. + * Resets the given view. If the view is an instance of {@link InputStream}, {@link ReadableByteChannel} or + * other objects that may be affected by views operations, this method will reset the storage position. + * The view must have been previously marked by {@link #mark(Object)}. + * + * <p>This method is <strong>not</strong> a substitute for the requirement that users leave the + * {@link #getStorageAs(Class)} return value in the same state as they found it. This method is + * only for handling the cases where using a view has an indirect impact on another view.</p> * * <div class="note"><b>Rational:</b> * {@link DataStoreProvider#probeContent(StorageConnector)} contract requires that implementors reset the @@ -524,79 +725,40 @@ public class StorageConnector implements * then the user performed a call to {@link ChannelDataInput#reset()} (for instance), which did not reseted * the underlying input stream. So we need to perform the missing {@link InputStream#reset()} here, then * synchronize the {@code ChannelDataInput} position accordingly.</div> + * + * @param c container of the view to reset, or {@code null} if none. + * @return {@code true} if the given view, after reset, is valid. + * Note that {@link Coupled#view} may be null and valid. */ - private <T> void resetStorage() throws DataStoreException { - if (lastView != null) { - /* - * We must reset InputStream or ReadableChannel position before to run cleanup code. - * Note on InputStream.reset() behavior documented in java.io: - * - * - It does not discard the mark, so it is okay if reset() is invoked twice. - * - If mark is unsupported, may either throw IOException or reset the stream - * to an implementation-dependent fixed state. - */ - boolean isReset = false; - IOException cause = null; - try { - if (storage instanceof InputStream) { - ((InputStream) storage).reset(); - isReset = true; - } else if (storage instanceof SeekableByteChannel) { - ((SeekableByteChannel) storage).position(getView(ChannelDataInput.class).channelOffset); - isReset = true; - } - } catch (IOException e) { - cause = e; - } - if (!isReset) { - throw new ForwardOnlyStorageException(Resources.format( - Resources.Keys.StreamIsReadOnce_1, getStorageName()), cause); - } - /* - * At this point the InputStream or ReadableChannel has been reset. - * Now reset or remove any view that depend on it. - */ - for (final Map.Entry<Class<?>, Consumer<StorageConnector>> entry : viewsToSync.entrySet()) { - final Consumer<StorageConnector> sync = entry.getValue(); - if (sync != null) { - sync.accept(this); - } else { - removeView(entry.getKey()); // Reader will need to be recreated from scratch. - } - } + private boolean reset(final Coupled c) throws DataStoreException { + final boolean done; + if (c == null) { + return false; + } else try { + done = c.reset(); + } catch (DataStoreException e) { + throw e; + } catch (Exception e) { + throw new ForwardOnlyStorageException(Resources.format( + Resources.Keys.StreamIsReadOnce_1, getStorageName()), e); + } + if (done) { + c.invalidateSources(); + c.invalidateUsages(); } + return c.isValid; } /** - * Resets {@link ChannelDataInput} after the {@link InputStream} has been reseted. - * This method is registered in {@link #viewsToSync} when a {@link ChannelDataInput} is created. + * Resets the root {@link #storage} object. + * + * @throws DataStoreException if the storage can not be reseted. */ - private void resetChannelDataInput() { - ChannelDataInput channel = getView(ChannelDataInput.class); // Should never be null. - channel.buffer.limit(0); // Must be after storage.reset(). - channel.setStreamPosition(0); // Must be after buffer.limit(0). - } - - /** - * Gets or creates a view for the input as a {@link ChannelDataInput} if possible. If {@code ChannelDataInput} - * instance already exists, then this method returns it as-is (this method does <strong>not</strong> verify if - * the {@code ChannelDataInput} instance is an image input stream). Otherwise a new {@code ChannelDataInput} - * is created (if possible), cached and returned. - * - * @param asImageInputStream whether new {@code ChannelDataInput} should be {@link ChannelImageInputStream}. - * This argument is ignored if a {@code ChannelDataInput} instance already exists. - * @return the existing or new {@code ChannelDataInput}, or {@code null} if none can be created. - */ - private ChannelDataInput getChannelDataInput(final boolean asImageInputStream) throws IOException, DataStoreException { - if (views != null) { - final Object view = views.get(ChannelDataInput.class); - if (view != null) { - return (view != Void.TYPE) ? (ChannelDataInput) view : null; - } + private void reset() throws DataStoreException { + if (views != null && !reset(views.get(null))) { + throw new ForwardOnlyStorageException(Resources.format( + Resources.Keys.StreamIsReadOnce_1, getStorageName())); } - final ChannelDataInput view = createChannelDataInput(asImageInputStream); // May be null. - addView(ChannelDataInput.class, view); // Cache even if null. - return view; } /** @@ -613,12 +775,13 @@ public class StorageConnector implements * Before to try to wrap an InputStream, mark its position so we can rewind if the user asks for * the InputStream directly. We need to reset because ChannelDataInput may have read some bytes. * Note that if mark is unsupported, the default InputStream.mark() implementation does nothing. - * See above 'resetStorage()' method. */ - markStorage(); + reset(); + mark(storage); /* * Following method call recognizes ReadableByteChannel, InputStream (with special case for FileInputStream), * URL, URI, File, Path or other types that may be added in future Apache SIS versions. + * If the given storage is already a ReadableByteChannel, then the factory will return it as-is. */ final ChannelFactory factory = ChannelFactory.prepare(storage, getOption(OptionKey.URL_ENCODING), false, getOption(OptionKey.OPEN_OPTIONS)); @@ -627,11 +790,11 @@ public class StorageConnector implements } /* * ChannelDataInput depends on ReadableByteChannel, which itself depends on storage - * (potentially an InputStream). We need to remember this chain in 'viewsToClose' map. + * (potentially an InputStream). We need to remember this chain in 'Coupled' objects. */ final String name = getStorageName(); final ReadableByteChannel channel = factory.reader(name); - addViewToClose(channel, storage); + addView(ReadableByteChannel.class, channel, null, factory.isCoupled(), false); ByteBuffer buffer = getOption(OptionKey.BYTE_BUFFER); // User-supplied buffer. if (buffer == null) { buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); // Default buffer if user did not specified any. @@ -642,7 +805,7 @@ public class StorageConnector implements } else { asDataInput = new ChannelDataInput(name, channel, buffer, false); } - addViewToClose(asDataInput, channel); + addView(ChannelDataInput.class, asDataInput, ReadableByteChannel.class, true, false); /* * Following is an undocumented mechanism for allowing some Apache SIS implementations of DataStore * to re-open the same channel or input stream another time, typically for re-reading the same data. @@ -650,14 +813,6 @@ public class StorageConnector implements if (factory.canOpen()) { addView(ChannelFactory.class, factory); } - /* - * If the channels to be created by ChannelFactory are wrappers around InputStream or any other object - * that may be affected when read operations will occur, we need to remember that fact in order to keep - * the storage synchronized with the view. - */ - if (factory.isCoupled()) { - addViewToSync(ChannelDataInput.class, StorageConnector::resetChannelDataInput); - } return asDataInput; } @@ -674,15 +829,31 @@ public class StorageConnector implements */ private DataInput createDataInput() throws IOException, DataStoreException { /* - * Creates a ChannelImageInputStream instance. We really need that specific type because some - * SIS data stores will want to access directly the channel and the buffer. We will fallback - * on the ImageIO.createImageInputStream(Object) method only in last resort. + * Gets or creates a ChannelImageInputStream instance if possible. We really need that specific + * type because some SIS data stores will want to access directly the channel and the buffer. + * We will fallback on the ImageIO.createImageInputStream(Object) method only in last resort. */ - final ChannelDataInput c = getChannelDataInput(true); + Coupled c = getView(ChannelDataInput.class); + final ChannelDataInput in; + if (reset(c)) { + in = (ChannelDataInput) c.view; + } else { + in = createChannelDataInput(true); // May be null. + } final DataInput asDataInput; - if (c == null) { + if (in != null) { + c = getView(ChannelDataInput.class); // May have been added by createChannelDataInput(…). + if (in instanceof DataInput) { + asDataInput = (DataInput) in; + } else { + asDataInput = new ChannelImageInputStream(in); // Upgrade existing instance. + c.view = asDataInput; + } + views.put(DataInput.class, c); // Share the same Coupled instance. + } else { + reset(); asDataInput = ImageIO.createImageInputStream(storage); - addViewToClose(asDataInput, storage); + addView(DataInput.class, asDataInput, null, true, true); /* * Note: Java Image I/O wrappers for Input/OutputStream do NOT close the underlying streams. * This is a complication for us. We could mitigate the problem by subclassing the standard @@ -690,18 +861,6 @@ public class StorageConnector implements * code should never be executed for InputStream storage. Instead getChannelDataInput(true) * should have created a ChannelImageInputStream or ChannelDataInput. */ - } else if (c instanceof DataInput) { - asDataInput = (DataInput) c; - // No call to 'addViewToClose' because it has already be done by createChannelDataInput(…). - } else { - asDataInput = new ChannelImageInputStream(c); // Upgrade existing instance. - if (views.put(ChannelDataInput.class, asDataInput) != c) { // Replace the previous instance. - throw new ConcurrentModificationException(); - } - addViewToClose(asDataInput, c.channel); - if (viewsToClose.remove(c) != c.channel) { // Shall be after 'addViewToClose'. - throw new CorruptedObjectException(); - } } return asDataInput; } @@ -722,30 +881,32 @@ public class StorageConnector implements * First, try to create the ChannelDataInput if it does not already exists. * If successful, this will create a ByteBuffer companion as a side effect. */ - final ChannelDataInput c = getChannelDataInput(false); + final ChannelDataInput c = getStorageAs(ChannelDataInput.class); + ByteBuffer asByteBuffer = null; if (c != null) { - return c.buffer.asReadOnlyBuffer(); - } - /* - * If no ChannelDataInput has been create by the above code, get the input as an ImageInputStream and - * read an arbitrary amount of bytes. Read only a small amount of bytes because, at the contrary of the - * buffer created in createChannelDataInput(boolean), the buffer created here is unlikely to be used for - * the reading process after the recognition of the file format. - */ - final ImageInputStream in = getStorageAs(ImageInputStream.class); - if (in != null) { - in.mark(); - final byte[] buffer = new byte[MINIMAL_BUFFER_SIZE]; - final int n = in.read(buffer); - in.reset(); - if (n >= 1) { - final ByteBuffer asByteBuffer = ByteBuffer.wrap(buffer).order(in.getByteOrder()); - asByteBuffer.limit(n); - return asByteBuffer; - // Can not invoke asReadOnly() because 'prefetch()' need to be able to write in it. + asByteBuffer = c.buffer.asReadOnlyBuffer(); + } else { + /* + * If no ChannelDataInput has been created by the above code, get the input as an ImageInputStream and + * read an arbitrary amount of bytes. Read only a small amount of bytes because, at the contrary of the + * buffer created in createChannelDataInput(boolean), the buffer created here is unlikely to be used for + * the reading process after the recognition of the file format. + */ + final ImageInputStream in = getStorageAs(ImageInputStream.class); + if (in != null) { + in.mark(); + final byte[] buffer = new byte[MINIMAL_BUFFER_SIZE]; + final int n = in.read(buffer); + in.reset(); + if (n >= 1) { + // Can not invoke asReadOnly() because 'prefetch()' need to be able to write in it. + asByteBuffer = ByteBuffer.wrap(buffer).order(in.getByteOrder()); + asByteBuffer.limit(n); + } } } - return null; + addView(ByteBuffer.class, asByteBuffer); + return asByteBuffer; } /** @@ -761,19 +922,29 @@ public class StorageConnector implements */ final boolean prefetch() throws DataStoreException { try { - final ChannelDataInput c = getView(ChannelDataInput.class); + /* + * In most Apache SIS data store implementations, we use ChannelDataInput. If the object wrapped + * by ChannelDataInput has not been used directly, then Coupled.isValid should be true. In such + * case, reset(c) does nothing and ChannelDataInput.prefetch() will read new bytes from current + * channel position. Otherwise, a new read operation from the beginning will be required and we + * can only hope that it will read more bytes than last time. + */ + Coupled c = getView(ChannelDataInput.class); if (c != null) { - return c.prefetch() > 0; + reset(c); // Does nothing is c.isValid is true. + return c.isValid && ((ChannelDataInput) c.view).prefetch() > 0; } /* * The above code is the usual case. The code below this point is the fallback used when only * an ImageInputStream was available. In such case, the ByteBuffer can only be the one created * by the above createByteBuffer() method, which is known to be backed by a writable array. */ - final ImageInputStream input = getView(ImageInputStream.class); - if (input != null) { - final ByteBuffer buffer = getView(ByteBuffer.class); - if (buffer != null) { + c = getView(ImageInputStream.class); + if (reset(c)) { + final ImageInputStream input = (ImageInputStream) c.view; + c = getView(ByteBuffer.class); + if (reset(c)) { // reset(c) as a matter of principle, but (c != null) would have worked. + final ByteBuffer buffer = (ByteBuffer) c.view; final int p = buffer.limit(); final long mark = input.getStreamPosition(); input.seek(Math.addExact(mark, p)); @@ -793,15 +964,22 @@ public class StorageConnector implements /** * Creates an {@link ImageInputStream} from the {@link DataInput} if possible. This method simply - * casts {@code DataInput} is such cast is allowed. Since {@link #createDataInput()} instantiates + * casts {@code DataInput} if such cast is allowed. Since {@link #createDataInput()} instantiates * {@link ChannelImageInputStream}, this cast is usually possible. * * <p>This method is one of the {@link #OPENERS} methods and should be invoked at most once per * {@code StorageConnector} instance.</p> */ private ImageInputStream createImageInputStream() throws DataStoreException { - final DataInput input = getStorageAs(DataInput.class); - return (input instanceof ImageInputStream) ? (ImageInputStream) input : null; + final Class<DataInput> source = DataInput.class; + final DataInput input = getStorageAs(source); + if (input instanceof ImageInputStream) { + views.put(ImageInputStream.class, views.get(source)); // Share the same Coupled instance. + return (ImageInputStream) input; + } else { + addView(ImageInputStream.class, null); // Remember that there is no view. + return null; + } } /** @@ -812,16 +990,19 @@ public class StorageConnector implements * {@code StorageConnector} instance.</p> */ private InputStream createInputStream() throws IOException, DataStoreException { - final DataInput input = getStorageAs(DataInput.class); + final Class<DataInput> source = DataInput.class; + final DataInput input = getStorageAs(source); if (input instanceof InputStream) { + views.put(InputStream.class, views.get(source)); // Share the same Coupled instance. return (InputStream) input; + } else if (input instanceof ImageInputStream) { + final InputStream in = new InputStreamAdapter((ImageInputStream) input); + addView(InputStream.class, in, source, true, false); + return in; + } else { + addView(InputStream.class, null); // Remember that there is no view. + return null; } - if (input instanceof ImageInputStream) { - final InputStream c = new InputStreamAdapter((ImageInputStream) input); - addViewToClose(c, input); - return c; - } - return null; } /** @@ -830,15 +1011,16 @@ public class StorageConnector implements * <p>This method is one of the {@link #OPENERS} methods and should be invoked at most once per * {@code StorageConnector} instance.</p> */ - private Reader createReader() throws DataStoreException { + private Reader createReader() throws IOException, DataStoreException { final InputStream input = getStorageAs(InputStream.class); if (input == null) { + addView(Reader.class, null); // Remember that there is no view. return null; } - markStorage(); + mark(input); final Charset encoding = getOption(OptionKey.ENCODING); - final Reader c = (encoding != null) ? new InputStreamReader(input, encoding) - : new InputStreamReader(input); + final Reader in = (encoding != null) ? new InputStreamReader(input, encoding) + : new InputStreamReader(input); /* * Current implementation does not wrap the above Reader in a BufferedReader because: * @@ -850,9 +1032,8 @@ public class StorageConnector implements * but we may need to provide our own subclass of BufferedReader in a future SIS version * if mark/reset support is needed here. */ - addViewToClose(c, input); - addViewToSync(Reader.class, null); - return c; + addView(Reader.class, in, InputStream.class, true, false); + return in; } /** @@ -864,7 +1045,7 @@ public class StorageConnector implements private Connection createConnection() throws SQLException { if (storage instanceof DataSource) { final Connection c = ((DataSource) storage).getConnection(); - addViewToClose(c, storage); + addView(Connection.class, c, null, true, false); return c; } return null; @@ -881,81 +1062,67 @@ public class StorageConnector implements } /** - * Adds the given view in the cache. + * Adds the given view in the cache, without dependencies. * * @param <T> the compile-time type of the {@code type} argument. * @param type the view type. * @param view the view, or {@code null} if none. */ private <T> void addView(final Class<T> type, final T view) { - if (views == null) { - views = new IdentityHashMap<>(); - } - if (views.put(type, (view != null) ? view : Void.TYPE) != null) { - // Should never happen, unless someone used this StorageConnector in another thread. - throw new ConcurrentModificationException(); - } - } - - /** - * Returns the view for the given type from the cache. - * - * @param <T> the compile-time type of the {@code type} argument. - * @param type the view type. - * @return the view, or {@code null} if none. - */ - private <T> T getView(final Class<T> type) { - // Note: this method is always invoked in a context where 'views' can not be null. - final Object view = views.get(type); - return (view != Void.TYPE) ? type.cast(view) : null; - } - - /** - * Removes the given view from the cache. - * This method is invoked for forcing the view to be recreated if requested again. - * - * @param type the view type to remove. - */ - private void removeView(final Class<?> type) { - if (views.remove(type) != null) { - viewsToClose.remove(type); - } + addView(type, view, null, false, false); } /** - * Declares that the view of the given type is coupled with {@link #storage}. - * A change of view position will change storage position, and vis-versa. - * See {@link #viewsToSync} for more information. + * Adds the given view in the cache together with information about its dependency. + * For example {@link InputStreamReader} is a wrapper for a {@link InputStream}: read operations + * from the later may change position of the former, and closing the later also close the former. * - * @param sync action to execute after {@link #storage} has been reset, - * or {@code null} if the view should be removed. + * @param <T> the compile-time type of the {@code type} argument. + * @param type the view type. + * @param view the view, or {@code null} if none. + * @param source the type of input that {@code view} is wrapping, or {@code null} for {@link #storage}. + * @param cascadeOnReset whether calls to {@link #reset(Coupled)} shall cascade. + * @param cascadeOnClose whether calls to {@link AutoCloseable#close()} shall cascade. */ - private void addViewToSync(final Class<?> type, final Consumer<StorageConnector> sync) { - if (viewsToSync == null) { - viewsToSync = new IdentityHashMap<>(4); - } - if (viewsToSync.put(type, sync) != null) { - // Should never happen, unless someone used this StorageConnector in another thread. - throw new ConcurrentModificationException(); + private <T> void addView(final Class<T> type, final T view, final Class<?> source, + final boolean cascadeOnReset, final boolean cascadeOnClose) + { + if (views == null) { + views = new IdentityHashMap<>(); + views.put(null, new Coupled(storage)); } + Coupled c = views.get(type); + if (c == null) { + if (view == storage) { + c = views.get(null); + c.invalidateUsages(); + } else { + c = new Coupled((cascadeOnReset | cascadeOnClose) ? views.get(source) : null, cascadeOnReset, cascadeOnClose); + // Newly created objects are not yet used by anyone, so no need to invoke c.invalidateUsages(). + } + views.put(type, c); + } else { + assert c.view == null || c.view == view : c; + assert c.cascadeOnReset == cascadeOnReset && c.cascadeOnClose == cascadeOnClose : type; + assert c.wrapperFor == ((cascadeOnReset | cascadeOnClose) ? views.get(source) : null) : c; + c.invalidateUsages(); + } + c.view = view; + c.isValid = true; + c.invalidateSources(); } /** - * Declares that the given {@code input} will need to be closed by the {@link #closeAllExcept(Object)} method. - * The {@code input} argument is always a new instance wrapping, directly or indirectly, the {@link #storage}. - * Callers must specify the wrapped object in the {@code delegate} argument. + * Returns the view for the given type from the cache. + * This method does <strong>not</strong> {@linkplain #reset(Coupled) reset} the view. * - * @param input the newly created object which will need to be closed. - * @param delegate the object wrapped by the given {@code input}. + * @param type the view type, or {@code null} for the {@link #storage} container. + * @return information associated to the given type. May be {@code null} if the view has never been + * requested before. {@link Coupled#view} may be {@code null} if the view has been requested + * and we determined that none can be created. */ - private void addViewToClose(final Object input, final Object delegate) { - if (viewsToClose == null) { - viewsToClose = new IdentityHashMap<>(4); - } - if (viewsToClose.put(input, delegate) != null) { - // Should never happen, unless someone used this StorageConnector in another thread. - throw new ConcurrentModificationException(); - } + private Coupled getView(final Class<?> type) { + return (views != null) ? views.get(type) : null; } /** @@ -977,18 +1144,54 @@ public class StorageConnector implements * @see DataStoreProvider#open(StorageConnector) */ public void closeAllExcept(final Object view) throws DataStoreException { - final Map<Object,Object> toClose = viewsToClose; - viewsToClose = Collections.emptyMap(); - views = Collections.emptyMap(); - if (toClose == null) { + if (views == null) { + views = Collections.emptyMap(); // For blocking future usage of this StorageConnector instance. if (storage != view && storage instanceof AutoCloseable) try { ((AutoCloseable) storage).close(); + } catch (DataStoreException e) { + throw e; } catch (Exception e) { throw new DataStoreException(e); } return; } /* + * Create a list of all views to close. The boolean value is TRUE if the view should be closed, or FALSE + * if the view should be protected (not closed). FALSE values shall have precedence over TRUE values. + */ + final Map<AutoCloseable,Boolean> toClose = new IdentityHashMap<>(views.size()); + for (Coupled c : views.values()) { + @SuppressWarnings("null") + Object v = c.view; + if (v != view) { + if (v instanceof AutoCloseable) { + toClose.putIfAbsent((AutoCloseable) v, Boolean.TRUE); // Mark 'v' as needing to be closed. + } + } else { + /* + * If there is a view to not close, search for all views that are wrapper for the given view. + * Those wrappers shall not be closed. For example if the caller does not want to close the + * InputStream view, then we shall not close the InputStreamReader wrapper neither. + */ + c.protect(toClose); + do { + v = c.view; + if (v instanceof AutoCloseable) { + toClose.put((AutoCloseable) v, Boolean.FALSE); // Protect 'v' against closing. + } + c = c.wrapperFor; + } while (c != null); + } + } + /* + * Trim the map in order to keep only the views to close. + */ + for (final Iterator<Boolean> it = toClose.values().iterator(); it.hasNext();) { + if (Boolean.FALSE.equals(it.next())) { + it.remove(); + } + } + /* * The "AutoCloseable.close() is not indempotent" problem * ------------------------------------------------------ * We will need a set of objects to close without duplicated values. For example the values associated to the @@ -998,58 +1201,31 @@ public class StorageConnector implements * * Generally speaking, all AutoCloseable instances are not guaranteed to be indempotent because this is not * required by the interface contract. Consequently we must be careful to not invoke the close() method on - * the same instance twice (indirectly or indirectly). - * - * The set of objects to close will be the keys of the 'viewsToClose' map. It can not be the values of the - * 'views' map. + * the same instance twice (indirectly or indirectly). An exception to this rule is ImageInputStream, which + * does not close its underlying stream. Those exceptions are identified by 'cascadeOnClose' set to 'true'. */ - toClose.put(storage, null); - if (view != null) { - /* - * If there is a view to not close, search for all views that are wrapper for the given view. - * Those wrappers shall not be closed. For example if the caller does not want to close the - * InputStream view, then we shall not close the InputStreamReader wrapper neither. - */ - final Queue<Object> deferred = new LinkedList<>(); - Object doNotClose = view; - do { - final Iterator<Map.Entry<Object,Object>> it = toClose.entrySet().iterator(); - while (it.hasNext()) { - final Map.Entry<Object,Object> entry = it.next(); - if (entry.getValue() == doNotClose) { - deferred.add(entry.getKey()); - it.remove(); + if (!toClose.isEmpty()) { + for (Coupled c : views.values()) { + if (!c.cascadeOnClose && toClose.containsKey(c.view)) { // Keep (do not remove) the "top level" view. + while ((c = c.wrapperFor) != null) { + toClose.remove(c.view); // Remove all views below the "top level" one. + if (c.cascadeOnClose) break; } } - doNotClose = deferred.poll(); - } while (doNotClose != null); - } - /* - * Remove the view to not close. If that view is a wrapper for an other object, do not close the - * wrapped object neither. Proceed the dependency chain up to the original 'storage' object. - */ - for (Object doNotClose = view; doNotClose != null;) { - doNotClose = toClose.remove(doNotClose); - } - /* - * Remove all wrapped objects. After this loop, only the "top level" objects should remain - * (typically only one object). This block is needed because of the "AutoCloseable.close() - * is not idempotent" issue, otherwise we could have omitted it. - */ - for (final Object delegate : toClose.values().toArray()) { // 'toArray()' is for avoiding ConcurrentModificationException. - toClose.remove(delegate); + } } + views = Collections.emptyMap(); // For blocking future usage of this StorageConnector instance. /* - * Now close all remaining items. If an exception occurs, we will propagate it only after we are - * done closing all items. + * Now close all remaining items. Typically (but not necessarily) there is only one remaining item. + * If an exception occurs, we will propagate it only after we are done closing all items. */ DataStoreException failure = null; - for (final Object c : toClose.keySet()) { - if (c instanceof AutoCloseable) try { - ((AutoCloseable) c).close(); + for (final AutoCloseable c : toClose.keySet()) { + try { + c.close(); } catch (Exception e) { if (failure == null) { - failure = new DataStoreException(e); + failure = (e instanceof DataStoreException) ? (DataStoreException) e : new DataStoreException(e); } else { failure.addSuppressed(e); }