This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/strict_storage_connector in repository https://gitbox.apache.org/repos/asf/sis.git
commit 0983ed8bce612d6b374d8bc762ffab6c15ead258 Author: Alexis Manin <[email protected]> AuthorDate: Wed Apr 29 20:44:09 2020 +0200 refactor(Storage): Add an extension of storage connector with fail-fast behaviors [WIP] --- .../apache/sis/storage/StrictStorageConnector.java | 202 +++++++++++++++++++++ .../apache/sis/storage/StorageConnectorTest.java | 68 ++++++- .../sis/storage/StrictStorageConnectorTest.java | 100 ++++++++++ .../apache/sis/test/suite/StorageTestSuite.java | 1 + 4 files changed, 362 insertions(+), 9 deletions(-) diff --git a/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java new file mode 100644 index 0000000..0ba6ed5 --- /dev/null +++ b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java @@ -0,0 +1,202 @@ +package org.apache.sis.storage; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.Callable; +import javax.imageio.stream.ImageInputStream; +import javax.sql.DataSource; +import org.apache.sis.util.UnconvertibleObjectException; +import org.apache.sis.util.collection.BackingStoreException; + +/** + * Extension of a storage connector providing strong encapsulation of "views". This allows to: + * <ul> + * <li>Adopt a <em>fail-fast</em> behavior in case storage view is corrupted by a user</li> + * <li> + * Provide easier usage: + * <ul> + * <li>Initial mark/final rewind is performed internally, user do not need to care about it.</li> + * <li>Provide strongly typed operators, to guide user on how to use this object.</li> + * </ul> + * </li> + * </ul> + * The purpose of this class is to be merged in StorageConnector once its principle has been validated. + * + * <em>Guarantees</em>: + * <ul> + * <li>This object is <em>not</em> concurrent, and ensure a <em>fail-fast</em> behavior in such cases.</li> + * <li> + * useAs* methods will enforce following behavior: + * <ul> + * <li>If possible, rewind properly consumed storage view to its initial state</li> + * <li>If above statement is not possible, an error will be immediately propagated, and the connector will be marked as closed.</li> + * </ul> + * </li> + * </ul> + * + * <em>TODO</em>: Ensure this object is properly closeable in any case. + * + * Typical usage: + * <ol> + * <li>Check storage compatibility through `useAs*` methods</li> + * <li>If the storage is compatible, commit our choice by locking a storage view, closing the connector in the process.</li> + * </ol> + * + * Example: + * <pre> + * final StrictStorageConnector c = new StrictStorageConnector(Paths.get("path/to/file"); + * // Use connector automatically reset buffering to check support + * Boolean isSupported = c.useAsBuffer((buffer) -%gt; buffer.get() == SEARCHED_KEY); + * // Once support is validated, acquire real storage connection. At this point, + * // storage life cycle becomes the responsability of the caller + * if (supported) { + * try ( InputStream stream = c.commit( InputStream.class ) ) { + * // read all needed data from aquired stream + * } + * } else c.closeAllExcept(null); // not acceptable input, completely close component. + * </pre> + */ +public class StrictStorageConnector extends StorageConnector { + + private volatile int concurrentFlag; + + public StrictStorageConnector(Object storage) { + super(storage); + } + + @Override + public void closeAllExcept(Object view) throws DataStoreException { + try { + doUnderControl(() -> { + concurrentFlag = -1; + super.closeAllExcept(view); + return null; + }); + } catch (IOException e) { + throw new DataStoreException(e); + } + } + + /** + * Provides an in-memory byte buffer containing first bytes of the source storage. + * To know how many bytes are available, refer to the buffer {@link ByteBuffer#remaining() remaining byte count}. + * User <em>do not</em> need to rewind buffer after use. It is the storage connector responsability. + * + * @param operator User operation to perform against preovided buffer. + * @param <T> + * @return + * @throws DataStoreException + * @throws IOException + */ + public <T> T useAsBuffer(StorageOperatingFunction<ByteBuffer, T> operator) throws DataStoreException, IOException { + return doUnderControl(() -> { + final ByteBuffer buffer = getOrFail(ByteBuffer.class); + try ( Closeable rewindOnceDone = buffer::rewind ) { + return operator.apply(buffer); + } + }); + } + + public <T> T useAsImageInputStream(StorageOperatingFunction<ImageInputStream, T> operator) throws IOException, DataStoreException { + return doUnderControl(() -> { + ImageInputStream stream = getOrFail(ImageInputStream.class); + final long positionCtrl = stream.getStreamPosition(); + stream.mark(); + T result; + try ( Closeable rewindOnceDone = stream::reset ) { + result = operator.apply(stream); + } + if (stream.getStreamPosition() != positionCtrl) { + concurrentFlag = -1; // mark this connector as closed/not valid anymore + throw new DataStoreException("Operator has messed with stream marks"); + } + return result; + }); + } + + /** + * Ensure only one storage operation is running at any time against this storage connector. It allows fail-fast + * behavior if this connector is used in concurrent context. + * + * @param operator The operation to perform once we checked no other operation is running. + * + * @param <V> Type of result value produced by given operator. + * @return The result produced by given operator. + * @throws IOException If anything wrong happens while input operator consumes storage, or we can mark/rewind storage. + * @throws DataStoreException Same reasons as for IOException + can happen if queried storage is of unsupported type. + * @throws IllegalStateException If this connector is already closed. + */ + protected <V> V doUnderControl(StorageCallable<V> operator) throws IOException, DataStoreException { + if (concurrentFlag < 0) throw new IllegalStateException("..."); + if (concurrentFlag != 0) throw new ConcurrentReadException("..."); + concurrentFlag++; + try { + return operator.call(); + } finally { + concurrentFlag--; + } + } + + public Optional<Path> getPath() { return getSilently(Path.class); } + + public Optional<URI> getURI() { return getSilently(URI.class); } + + public Optional<DataSource> getSQLDatasource() { return getSilently(DataSource.class); } + + public Optional<String> getPathAsString() { return getSilently(String.class); } + + /** + * Retrieve storage in the queried form, closing all other opened view in the same time. + * <em>Warning</em>: This method also closes this storage connector, making invalid any more calls on it. + * + * @param target Type of the view to get back / keep opened. + * @param <T> Type of the wanted storage connection. + * @return Underlying storage in the requested form. Never null. + * + * @throws IOException If anything goes wrong while initializing storage access. + * @throws DataStoreException If this connector is used concurrently, or if any problem occurs while initializing view. + * @throws IllegalStateException If this connector is already closed. + */ + public <T> T commit(Class<T> target) throws IOException, DataStoreException { + return doUnderControl(() -> { + final T result = getOrFail(target); + concurrentFlag = -1; //close flag + super.closeAllExcept(result); + return result; + }); + } + + private <T> T getOrFail(Class<T> target) throws DataStoreException { + T view = getStorageAs(target); + if (view == null) throw new UnsupportedStorageException(); + return view; + } + + private <T> Optional<T> getSilently(Class<T> target) { + try { + return Optional.ofNullable(getStorageAs(target)); + } catch (UnconvertibleObjectException e) { + // TODO: log fine + return Optional.empty(); + } catch (DataStoreException e) { + // According to current implementation, that should never happen. + // Moreover, it is not really logic to propagate DataStoreException, as this operation should not involve + // any "storage" logic (only in-memory path/uri conversion if needed). + throw new BackingStoreException(e); + } + } + + private interface StorageCallable<V> extends Callable<V> { + @Override + V call() throws IOException, DataStoreException; + } + + @FunctionalInterface + public interface StorageOperatingFunction<I, O> { + O apply(I storage) throws IOException, DataStoreException; + } +} diff --git a/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java b/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java index a7d33ea..381bec8 100644 --- a/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java +++ b/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java @@ -16,29 +16,40 @@ */ package org.apache.sis.storage; -import java.net.URI; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.IOException; import java.io.InputStream; import java.io.Reader; -import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.nio.channels.ReadableByteChannel; -import javax.imageio.stream.ImageInputStream; -import javax.imageio.ImageIO; +import java.nio.charset.StandardCharsets; import java.sql.Connection; -import org.apache.sis.setup.OptionKey; -import org.apache.sis.util.UnconvertibleObjectException; +import java.util.Random; +import javax.imageio.ImageIO; +import javax.imageio.stream.ImageInputStream; import org.apache.sis.internal.storage.io.ChannelDataInput; import org.apache.sis.internal.storage.io.ChannelImageInputStream; import org.apache.sis.internal.storage.io.InputStreamAdapter; -import org.apache.sis.test.DependsOnMethod; +import org.apache.sis.setup.OptionKey; import org.apache.sis.test.DependsOn; +import org.apache.sis.test.DependsOnMethod; import org.apache.sis.test.TestCase; +import org.apache.sis.util.UnconvertibleObjectException; import org.junit.Test; import static org.junit.Assume.assumeTrue; -import static org.opengis.test.Assert.*; +import static org.opengis.test.Assert.assertArrayEquals; +import static org.opengis.test.Assert.assertEquals; +import static org.opengis.test.Assert.assertFalse; +import static org.opengis.test.Assert.assertInstanceOf; +import static org.opengis.test.Assert.assertNotNull; +import static org.opengis.test.Assert.assertNotSame; +import static org.opengis.test.Assert.assertNull; +import static org.opengis.test.Assert.assertSame; +import static org.opengis.test.Assert.assertTrue; +import static org.opengis.test.Assert.fail; /** @@ -390,4 +401,43 @@ public final strictfp class StorageConnectorTest extends TestCase { assertTrue("channel.isOpen()", channel.isOpen()); channel.close(); } + + @Test + public void getting_buffer_should_not_change_underlying_stream_position() throws Exception { + final StorageConnector con = create(false); + + final ImageInputStream stream = con.getStorageAs(ImageInputStream.class); + + final ByteBuffer buffer = con.getStorageAs(ByteBuffer.class); + assertEquals(0, stream.getStreamPosition()); + + buffer.get(); + assertEquals(0, stream.getStreamPosition()); + + con.closeAllExcept(null); + } + + @Test + public void moving_stream_should_not_impact_buffer() throws Exception { + final byte[] data = new byte[(int) Math.pow(2, 16)]; + new Random().nextBytes(data); + final StorageConnector connector = new StorageConnector(new ByteArrayInputStream(data)); + final ByteBuffer buffer = connector.getStorageAs(ByteBuffer.class); + final int blockSize = buffer.remaining(); + final byte[] ctrl = new byte[blockSize]; + buffer.get(ctrl).rewind(); + + InputStream stream = connector.getStorageAs(InputStream.class); + stream.mark(blockSize *2); + stream.skip(blockSize); + stream.read(new byte[blockSize]); + stream.reset(); + + final byte[] afterValue = new byte[blockSize]; + buffer.get(afterValue).rewind(); + + assertArrayEquals(ctrl, afterValue); + + connector.closeAllExcept(null); + } } diff --git a/storage/sis-storage/src/test/java/org/apache/sis/storage/StrictStorageConnectorTest.java b/storage/sis-storage/src/test/java/org/apache/sis/storage/StrictStorageConnectorTest.java new file mode 100644 index 0000000..02cdb7e --- /dev/null +++ b/storage/sis-storage/src/test/java/org/apache/sis/storage/StrictStorageConnectorTest.java @@ -0,0 +1,100 @@ +package org.apache.sis.storage; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.sis.setup.OptionKey; +import org.apache.sis.test.DependsOn; +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +@DependsOn(org.apache.sis.storage.StorageConnectorTest.class) +public class StrictStorageConnectorTest { + /** + * Name of the test file, in the same directory than this {@code StorageConnectorTest} file. + */ + private static final String FILENAME = "Any.txt"; + + /** + * Creates the instance to test. This method uses the {@code "test.txt"} ASCII file as + * the resource to test. The resource can be provided either as a URL or as a stream. + */ + private static StrictStorageConnector create(final boolean asStream) { + final Class<?> c = StorageConnectorTest.class; + final Object storage = asStream ? c.getResourceAsStream(FILENAME) : c.getResource(FILENAME); + assertNotNull(storage); + final StrictStorageConnector connector = new StrictStorageConnector(storage); + connector.setOption(OptionKey.ENCODING, StandardCharsets.US_ASCII); + connector.setOption(OptionKey.URL_ENCODING, "UTF-8"); + return connector; + } + + private static byte[] getFileBytes() throws URISyntaxException, IOException { + final Path filePath = Paths.get(StorageConnector.class.getResource(FILENAME).toURI()); + return Files.readAllBytes(filePath); + } + + @Test + public void acquiring_path_works() { + final StrictStorageConnector connector = create(false); + assertTrue(connector.getPath().isPresent()); + assertTrue(connector.getURI().isPresent()); + assertTrue(connector.getPathAsString().isPresent()); + assertFalse(connector.getSQLDatasource().isPresent()); + } + + @Test + public void stream_based_connector_return_empty_path() { + final StrictStorageConnector connector = create(true); + assertFalse(connector.getPath().isPresent()); + assertFalse(connector.getURI().isPresent()); + assertFalse(connector.getPathAsString().isPresent()); + assertFalse(connector.getSQLDatasource().isPresent()); + } + + @Test + public void byte_buffer_is_rewind_after_use() throws Exception { + final byte[] ctrl = getFileBytes(); + final StrictStorageConnector connector = create(false); + // Mess with internal buffer + connector.useAsBuffer(buffer -> { + // mess with it + return buffer.get(new byte[10]); + }); + // ensure it has been properly rewind + connector.useAsBuffer(buffer -> { + assertEquals(0, buffer.position()); + byte[] readValue = new byte[buffer.remaining()]; + buffer.get(readValue); + assertArrayEquals(ctrl, readValue); + return null; + }); + } + + @Test + @Ignore("To implement") + public void no_concurrency_allowed() throws Exception { + + } + + @Test + @Ignore("To implement") + public void commit_close_all_resources_but_chosen() throws Exception { + + } + + @Test + @Ignore("To implement") + public void closing_multiple_times_causes_no_error() throws Exception { + + } +} diff --git a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java index bbabdf6..1d008a5 100644 --- a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java +++ b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java @@ -43,6 +43,7 @@ import org.junit.BeforeClass; org.apache.sis.storage.FeatureNamingTest.class, org.apache.sis.storage.ProbeResultTest.class, org.apache.sis.storage.StorageConnectorTest.class, + org.apache.sis.storage.StrictStorageConnectorTest.class, org.apache.sis.storage.event.StoreListenersTest.class, org.apache.sis.internal.storage.query.SimpleQueryTest.class, org.apache.sis.internal.storage.xml.MimeTypeDetectorTest.class,
