This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4619 in repository https://gitbox.apache.org/repos/asf/tika.git
commit d1365e9010311dc2a02f97ad4726ae16e438faf6 Author: tallison <[email protected]> AuthorDate: Mon Jan 12 12:40:31 2026 -0500 TIKA-4619 - builds without tests, WIP do not commit --- .../org/apache/tika/io/CachingInputStream.java | 191 +++++ .../org/apache/tika/io/InputStreamFactory.java | 34 - .../apache/tika/io/ReadOnceTikaInputStream.java | 325 ++++++++ .../main/java/org/apache/tika/io/StreamCache.java | 279 +++++++ .../java/org/apache/tika/io/TikaInputStream.java | 842 +++++---------------- .../parser/multiple/AbstractMultipleParser.java | 8 +- .../java/org/apache/tika/utils/ParserUtils.java | 43 -- .../apache/tika/utils/RereadableInputStream.java | 308 -------- .../org/apache/tika/TestRereadableInputStream.java | 204 ----- .../tika/io/ReadOnceTikaInputStreamTest.java | 272 +++++++ .../org/apache/tika/io/TikaInputStreamTest.java | 562 +++++++++++++- .../org/apache/tika/parser/crypto/TSDParser.java | 23 +- .../html/StandardHtmlEncodingDetectorTest.java | 2 +- .../detect/microsoft/POIFSContainerDetector.java | 9 +- .../apache/tika/detect/ole/MiscOLEDetector.java | 6 +- .../tika/parser/wordperfect/WPInputStreamTest.java | 3 +- .../org/apache/tika/zip/utils/ZipSalvager.java | 15 +- 17 files changed, 1838 insertions(+), 1288 deletions(-) diff --git a/tika-core/src/main/java/org/apache/tika/io/CachingInputStream.java b/tika-core/src/main/java/org/apache/tika/io/CachingInputStream.java new file mode 100644 index 0000000000..570b262631 --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/io/CachingInputStream.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.io; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; + +/** + * Package-private InputStream wrapper that caches all bytes read, + * allowing seeking back to any previously-read position. + * <p> + * Bytes are cached in a {@link StreamCache} which stores them in memory + * up to a threshold, then spills to a temporary file. + */ +class CachingInputStream extends InputStream { + + private final InputStream source; + private final StreamCache cache; + + private long position; // Current logical position in the stream + private boolean sourceExhausted; + + CachingInputStream(InputStream source, StreamCache cache) { + this.source = source; + this.cache = cache; + this.position = 0; + this.sourceExhausted = false; + } + + @Override + public int read() throws IOException { + if (position < cache.size()) { + // Reading from cache (replay mode) + int b = cache.readAt(position); + if (b != -1) { + position++; + } + return b; + } + + if (sourceExhausted) { + return -1; + } + + // Reading new byte from source + int b = source.read(); + if (b == -1) { + sourceExhausted = true; + return -1; + } + + cache.append(b); + position++; + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len == 0) { + return 0; + } + + int totalRead = 0; + + // First, read any available bytes from cache + long cacheSize = cache.size(); + if (position < cacheSize) { + int availableInCache = (int) Math.min(len, cacheSize - position); + int n = cache.readAt(position, b, off, availableInCache); + if (n > 0) { + position += n; + totalRead += n; + off += n; + len -= n; + } + } + + // If we need more bytes and source isn't exhausted, read from source + if (len > 0 && !sourceExhausted) { + int n = source.read(b, off, len); + if (n == -1) { + sourceExhausted = true; + } else if (n > 0) { + cache.append(b, off, n); + position += n; + totalRead += n; + } + } + + return totalRead > 0 ? totalRead : -1; + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + + // We need to actually read the bytes to cache them + long skipped = 0; + byte[] buffer = new byte[4096]; + while (skipped < n) { + int toRead = (int) Math.min(buffer.length, n - skipped); + int read = read(buffer, 0, toRead); + if (read == -1) { + break; + } + skipped += read; + } + return skipped; + } + + /** + * Seek to a specific position in the stream. + * Can only seek to positions that have already been read (cached). + */ + void seekTo(long newPosition) throws IOException { + if (newPosition < 0) { + throw new IOException("Cannot seek to negative position: " + newPosition); + } + if (newPosition > cache.size()) { + throw new IOException("Cannot seek past cached content. Position: " + newPosition + ", cached: " + cache.size()); + } + this.position = newPosition; + } + + /** + * Get the current position in the stream. + */ + long getPosition() { + return position; + } + + /** + * Get the number of bytes currently cached. + */ + long getCachedSize() { + return cache.size(); + } + + /** + * Force all remaining content to be read and cached, then return the file path. + */ + Path spillToFile() throws IOException { + return cache.toFile(source); + } + + /** + * Check if the cache has spilled to a file. + */ + boolean isFileBacked() { + return cache.isFileBacked(); + } + + @Override + public void close() throws IOException { + source.close(); + cache.close(); + } + + @Override + public int available() throws IOException { + // Return cached bytes available from current position + long cachedAvailable = cache.size() - position; + if (cachedAvailable > 0) { + return (int) Math.min(cachedAvailable, Integer.MAX_VALUE); + } + return source.available(); + } + + @Override + public boolean markSupported() { + // Mark/reset is handled at the TikaInputStream level + return false; + } +} diff --git a/tika-core/src/main/java/org/apache/tika/io/InputStreamFactory.java b/tika-core/src/main/java/org/apache/tika/io/InputStreamFactory.java deleted file mode 100644 index 17e416a579..0000000000 --- a/tika-core/src/main/java/org/apache/tika/io/InputStreamFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.io; - -import java.io.IOException; -import java.io.InputStream; - -/** - * <p>A factory which returns a fresh {@link InputStream} for the <em>same</em> - * resource each time.</p> - * <p>This is typically desired where it is easier / quicker / simpler to - * fetch a fresh {@link InputStream} to re-read a given resource, rather - * than do any kind of buffering.</p> - * <p>It is typically used with {@link TikaInputStream#get(InputStreamFactory)} - * when combined with a Parser which needs to read the resource's stream - * multiple times when processing.</p> - */ -public interface InputStreamFactory { - InputStream getInputStream() throws IOException; -} diff --git a/tika-core/src/main/java/org/apache/tika/io/ReadOnceTikaInputStream.java b/tika-core/src/main/java/org/apache/tika/io/ReadOnceTikaInputStream.java new file mode 100644 index 0000000000..7fd1586322 --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/io/ReadOnceTikaInputStream.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.io; + +import java.io.BufferedInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.file.Path; + +import org.apache.tika.metadata.Metadata; + +/** + * A lightweight TikaInputStream for single-pass reading. + * <p> + * This class provides basic buffered stream functionality without the overhead + * of caching all bytes read. It is suitable for use cases where you only need + * to read through the stream once, such as detection-only scenarios. + * <p> + * Limited mark/reset support is provided via the underlying {@link BufferedInputStream}, + * but only within its buffer size (typically 8KB). Methods that require full + * stream caching ({@link #getPath()}, {@link #rewind()}) will throw + * {@link UnsupportedOperationException}. + * <p> + * For full rewind and file-spooling capabilities, use {@link TikaInputStream} instead. + */ +public class ReadOnceTikaInputStream extends InputStream { + + private static final int DEFAULT_BUFFER_SIZE = 8192; + private static final int MAX_CONSECUTIVE_EOFS = 1000; + + private final BufferedInputStream in; + private final int bufferSize; + private long position = 0; + private long mark = -1; + private Object openContainer; + private int consecutiveEOFs = 0; + private int closeShieldDepth = 0; + private long length; + + private ReadOnceTikaInputStream(InputStream stream, int bufferSize, long length) { + this.bufferSize = bufferSize; + this.in = stream instanceof BufferedInputStream + ? (BufferedInputStream) stream + : new BufferedInputStream(stream, bufferSize); + this.length = length; + } + + // ========== Static Factory Methods ========== + + public static ReadOnceTikaInputStream get(InputStream stream) { + return get(stream, DEFAULT_BUFFER_SIZE); + } + + public static ReadOnceTikaInputStream get(InputStream stream, int bufferSize) { + if (stream == null) { + throw new NullPointerException("The Stream must not be null"); + } + if (stream instanceof ReadOnceTikaInputStream) { + return (ReadOnceTikaInputStream) stream; + } + return new ReadOnceTikaInputStream(stream, bufferSize, -1); + } + + public static ReadOnceTikaInputStream get(InputStream stream, Metadata metadata) { + return get(stream, DEFAULT_BUFFER_SIZE); + } + + public static ReadOnceTikaInputStream get(byte[] data) { + return new ReadOnceTikaInputStream( + new java.io.ByteArrayInputStream(data), + DEFAULT_BUFFER_SIZE, + data.length + ); + } + + // ========== InputStream Methods ========== + + @Override + public int read() throws IOException { + int b = in.read(); + if (b != -1) { + position++; + consecutiveEOFs = 0; + } else { + consecutiveEOFs++; + if (consecutiveEOFs > MAX_CONSECUTIVE_EOFS) { + throw new IOException("Read too many -1 (EOFs); there could be an infinite loop. " + + "If you think your file is not corrupt, please open an issue on Tika's JIRA"); + } + } + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int n = in.read(b, off, len); + if (n > 0) { + position += n; + consecutiveEOFs = 0; + } else if (n == -1) { + consecutiveEOFs++; + if (consecutiveEOFs > MAX_CONSECUTIVE_EOFS) { + throw new IOException("Read too many -1 (EOFs); there could be an infinite loop. " + + "If you think your file is not corrupt, please open an issue on Tika's JIRA"); + } + } + return n; + } + + @Override + public long skip(long n) throws IOException { + // Must read bytes (not skip) to ensure they're in BufferedInputStream's buffer + // for mark/reset to work correctly + if (skipBuffer == null) { + skipBuffer = new byte[4096]; + } + long skipped = IOUtils.skip(in, n, skipBuffer); + position += skipped; + return skipped; + } + + private byte[] skipBuffer; + + @Override + public int available() throws IOException { + return in.available(); + } + + /** + * Marks the current position in the stream. + * <p> + * Note: The mark is only valid within the buffer size limit (default 8KB). + * Reading beyond this limit will invalidate the mark. + * + * @param readlimit the maximum number of bytes that can be read before + * the mark becomes invalid. This is limited by the buffer size. + */ + @Override + public void mark(int readlimit) { + if (readlimit > bufferSize) { + throw new IllegalArgumentException( + "Mark readlimit " + readlimit + " exceeds buffer size " + bufferSize + + ". Use TikaInputStream for larger mark limits."); + } + in.mark(readlimit); + mark = position; + } + + @Override + public boolean markSupported() { + return true; + } + + /** + * Resets the stream to the previously marked position. + * <p> + * Note: This only works if the mark is still valid (i.e., fewer than + * readlimit bytes have been read since mark() was called). + * + * @throws IOException if the mark has been invalidated or was never set + */ + @Override + public void reset() throws IOException { + in.reset(); + position = mark; + mark = -1; + consecutiveEOFs = 0; + } + + @Override + public void close() throws IOException { + if (closeShieldDepth > 0) { + return; + } + in.close(); + } + + // ========== TikaInputStream-compatible Methods ========== + + /** + * Fills the given buffer with upcoming bytes without advancing position. + * <p> + * Note: This only works if the peek size is within the buffer limit. + * + * @param buffer byte buffer to fill + * @return number of bytes written to the buffer + * @throws IOException if the stream cannot be read + * @throws IllegalArgumentException if peek size exceeds buffer limit + */ + public int peek(byte[] buffer) throws IOException { + int n = 0; + mark(buffer.length); // throws IllegalArgumentException if > bufferSize + + int m = read(buffer); + while (m != -1) { + n += m; + if (n < buffer.length) { + m = read(buffer, n, buffer.length - n); + } else { + m = -1; + } + } + + reset(); + return n; + } + + public Object getOpenContainer() { + return openContainer; + } + + public void setOpenContainer(Object container) { + openContainer = container; + } + + public void addCloseableResource(Closeable closeable) { + throw new UnsupportedOperationException( + "ReadOnceTikaInputStream does not track resources. " + + "Use TikaInputStream.get() for resource tracking."); + } + + /** + * @return always false - ReadOnceTikaInputStream does not support file backing + */ + public boolean hasFile() { + return false; + } + + /** + * Returns {@code null} because ReadOnceTikaInputStream does not support + * spilling to a file. Detectors and parsers that require file access + * should check for null and handle gracefully. + * + * @return always {@code null} + */ + public Path getPath() throws IOException { + return null; + } + + /** + * Returns {@code null} because ReadOnceTikaInputStream does not support + * spilling to a file. Detectors and parsers that require file access + * should check for null and handle gracefully. + * + * @return always {@code null} + */ + public File getFile() throws IOException { + return null; + } + + /** + * Not supported. Use {@link TikaInputStream} for file access. + * + * @throws UnsupportedOperationException always + */ + public FileChannel getFileChannel() throws IOException { + throw new UnsupportedOperationException( + "ReadOnceTikaInputStream does not support getFileChannel(). " + + "Use TikaInputStream.get() for file access."); + } + + /** + * Not supported. Use {@link TikaInputStream} for rewind capability. + * + * @throws UnsupportedOperationException always + */ + public void rewind() throws IOException { + throw new UnsupportedOperationException( + "ReadOnceTikaInputStream does not support rewind(). " + + "Use TikaInputStream.get() for rewind capability."); + } + + public boolean hasLength() { + return length != -1; + } + + public long getLength() { + return length; + } + + public long getPosition() { + return position; + } + + public void setCloseShield() { + this.closeShieldDepth++; + } + + public void removeCloseShield() { + this.closeShieldDepth--; + } + + public boolean isCloseShield() { + return closeShieldDepth > 0; + } + + /** + * Returns the buffer size used for mark/reset operations. + */ + public int getBufferSize() { + return bufferSize; + } + + @Override + public String toString() { + return "ReadOnceTikaInputStream[position=" + position + ", bufferSize=" + bufferSize + "]"; + } +} diff --git a/tika-core/src/main/java/org/apache/tika/io/StreamCache.java b/tika-core/src/main/java/org/apache/tika/io/StreamCache.java new file mode 100644 index 0000000000..457b6f0241 --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/io/StreamCache.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.io; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * Package-private cache that stores bytes in memory up to a threshold, + * then spills to a temporary file. Supports reading from any offset. + */ +class StreamCache implements Closeable { + + private static final int DEFAULT_MEMORY_THRESHOLD = 1024 * 1024; // 1MB + + private final int memoryThreshold; + private final TemporaryResources tmp; + + // Memory storage (null after spill) + private byte[] memoryBuffer; + private int memorySize; + + // File storage (null until spill) + private Path spillFile; + private OutputStream spillOutputStream; + private long totalSize; + + private boolean closed; + + StreamCache(TemporaryResources tmp) { + this(tmp, DEFAULT_MEMORY_THRESHOLD); + } + + StreamCache(TemporaryResources tmp, int memoryThreshold) { + this.tmp = tmp; + this.memoryThreshold = memoryThreshold; + this.memoryBuffer = new byte[Math.min(memoryThreshold, 8192)]; + this.memorySize = 0; + this.totalSize = 0; + } + + /** + * Append a single byte to the cache. + */ + void append(int b) throws IOException { + if (closed) { + throw new IOException("StreamCache is closed"); + } + + if (memoryBuffer != null) { + // Still in memory mode + if (memorySize >= memoryThreshold) { + spillToFile(); + spillOutputStream.write(b); + } else { + ensureMemoryCapacity(memorySize + 1); + memoryBuffer[memorySize++] = (byte) b; + } + } else { + // Already spilled to file + spillOutputStream.write(b); + } + totalSize++; + } + + /** + * Append multiple bytes to the cache. + */ + void append(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("StreamCache is closed"); + } + + if (memoryBuffer != null) { + if (memorySize + len > memoryThreshold) { + spillToFile(); + spillOutputStream.write(b, off, len); + } else { + ensureMemoryCapacity(memorySize + len); + System.arraycopy(b, off, memoryBuffer, memorySize, len); + memorySize += len; + } + } else { + spillOutputStream.write(b, off, len); + } + totalSize += len; + } + + private void ensureMemoryCapacity(int needed) { + if (needed <= memoryBuffer.length) { + return; + } + int newSize = Math.min(memoryThreshold, Math.max(memoryBuffer.length * 2, needed)); + byte[] newBuffer = new byte[newSize]; + System.arraycopy(memoryBuffer, 0, newBuffer, 0, memorySize); + memoryBuffer = newBuffer; + } + + private void spillToFile() throws IOException { + if (spillFile != null) { + return; // Already spilled + } + + spillFile = tmp.createTempFile((String) null); + spillOutputStream = new BufferedOutputStream(Files.newOutputStream(spillFile)); + + // Write existing memory content to file + if (memorySize > 0) { + spillOutputStream.write(memoryBuffer, 0, memorySize); + } + + // Release memory buffer + memoryBuffer = null; + memorySize = 0; + } + + /** + * Read a single byte at the given position. + */ + int readAt(long position) throws IOException { + if (position < 0 || position >= totalSize) { + return -1; + } + + if (memoryBuffer != null) { + return memoryBuffer[(int) position] & 0xFF; + } else { + flushSpillStream(); + try (RandomAccessFile raf = new RandomAccessFile(spillFile.toFile(), "r")) { + raf.seek(position); + return raf.read(); + } + } + } + + /** + * Read multiple bytes starting at the given position. + */ + int readAt(long position, byte[] b, int off, int len) throws IOException { + if (position < 0 || position >= totalSize) { + return -1; + } + + int available = (int) Math.min(len, totalSize - position); + + if (memoryBuffer != null) { + System.arraycopy(memoryBuffer, (int) position, b, off, available); + return available; + } else { + flushSpillStream(); + try (RandomAccessFile raf = new RandomAccessFile(spillFile.toFile(), "r")) { + raf.seek(position); + return raf.read(b, off, available); + } + } + } + + /** + * Get an InputStream that reads from the given offset. + */ + InputStream getInputStreamFrom(long offset) throws IOException { + return new CacheInputStream(offset); + } + + private void flushSpillStream() throws IOException { + if (spillOutputStream != null) { + spillOutputStream.flush(); + } + } + + /** + * Force all content to a file and return the path. + * After this call, the cache is in file-backed mode. + */ + Path toFile() throws IOException { + if (spillFile == null) { + spillToFile(); + } + flushSpillStream(); + return spillFile; + } + + /** + * Finish writing (drain remaining source bytes) and return the file path. + */ + Path toFile(InputStream remainingSource) throws IOException { + // Copy remaining bytes from source + byte[] buffer = new byte[8192]; + int n; + while ((n = remainingSource.read(buffer)) != -1) { + append(buffer, 0, n); + } + return toFile(); + } + + /** + * Number of bytes currently cached. + */ + long size() { + return totalSize; + } + + /** + * Whether the cache has spilled to a file. + */ + boolean isFileBacked() { + return spillFile != null; + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + memoryBuffer = null; + + if (spillOutputStream != null) { + spillOutputStream.close(); + spillOutputStream = null; + } + // spillFile cleanup is handled by TemporaryResources + } + + /** + * Inner class for reading from the cache at a specific offset. + */ + private class CacheInputStream extends InputStream { + private long position; + + CacheInputStream(long startOffset) { + this.position = startOffset; + } + + @Override + public int read() throws IOException { + if (position >= totalSize) { + return -1; + } + int b = readAt(position); + if (b != -1) { + position++; + } + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (position >= totalSize) { + return -1; + } + int n = StreamCache.this.readAt(position, b, off, len); + if (n > 0) { + position += n; + } + return n; + } + } +} diff --git a/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java b/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java index 254bff7ba2..c4cdff1b5d 100644 --- a/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java +++ b/tika-core/src/main/java/org/apache/tika/io/TikaInputStream.java @@ -16,13 +16,9 @@ */ package org.apache.tika.io; -import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; - import java.io.BufferedInputStream; import java.io.Closeable; import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -41,99 +37,44 @@ import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; -import org.apache.tika.parser.Parser; import org.apache.tika.utils.StringUtils; /** - * Input stream with extended capabilities. The purpose of this class is - * to allow files and other resources and information to be associated with - * the {@link InputStream} instance passed through the - * {@link org.apache.tika.parser.Parser} interface and other similar APIs. - * <p> - * TikaInputStream instances can be created using the various static - * <code>get()</code> factory methods. Most of these methods take an optional - * {@link Metadata} argument that is then filled with the available input - * metadata from the given resource. The created TikaInputStream instance - * keeps track of the original resource used to create it, while behaving - * otherwise just like a normal, buffered {@link InputStream}. - * A TikaInputStream instance is also guaranteed to support the - * {@link #mark(int)} feature. - * <p> - * Code that wants to access the underlying file or other resources - * associated with a TikaInputStream should first use the - * {@link #get(InputStream)} factory method to cast or wrap a given - * {@link InputStream} into a TikaInputStream instance. + * Input stream with extended capabilities for detection and parsing. * <p> - * TikaInputStream includes a few safety features to protect against parsers - * that may fail to check for an EOF or may incorrectly rely on the unreliable - * value returned from {@link FileInputStream#skip}. These parser failures - * can lead to infinite loops. We strongly encourage the use of - * TikaInputStream. + * This implementation uses a {@link CachingInputStream} for stream-backed + * sources, which automatically caches all bytes read and supports seeking + * to any previously-read position. File-backed sources read directly from + * the file without caching. * * @since Apache Tika 0.8 */ public class TikaInputStream extends TaggedInputStream { private static final int MAX_CONSECUTIVE_EOFS = 1000; - /** - * Blob size threshold that limits the largest BLOB size to be - * buffered fully in memory by the {@link #get(Blob, Metadata)} - * method. - */ private static final int BLOB_SIZE_THRESHOLD = 1024 * 1024; - /** - * Tracker of temporary resources. - */ + private final TemporaryResources tmp; - /** - * The Factory that can create fresh {@link InputStream}s for - * the resource this reads for, eg when needing to re-read. - */ - private InputStreamFactory streamFactory; - /** - * The path to the file that contains the contents of this stream. - * This is either the original file passed to the - * {@link #TikaInputStream(Path)} constructor or a temporary file created - * by a call to the {@link #getPath()} method. If neither has been called, - * then the value is <code>null</code>. - */ + + // Non-null only for stream-backed (until getPath() spills to file) + private CachingInputStream cachingStream; + + // Path to backing file (original or temp) private Path path; - /** - * Total length of the stream, or -1 if unknown. - */ + private long length; - /** - * Current read position within this stream. - */ private long position = 0; - /** - * Marked position, or -1 if there is no current mark. - */ private long mark = -1; - /** - * A opened container, such as a POIFS FileSystem - * for an OLE2 document, or a Zip file for a - * zip based (eg ooxml, odf) document. - */ private Object openContainer; private int consecutiveEOFs = 0; private byte[] skipBuffer; - - /** - * If the stream should be shielded from closing - */ private int closeShieldDepth = 0; - - //suffix of the file if known. This is used to create temp files - //with the right suffixes. This should include the initial . as in ".doc" private String suffix = null; + // ========== Private Constructors ========== + /** - * Creates a TikaInputStream instance. This private constructor is used - * by the static factory methods based on the available information. - * - * @param path the path to the file that contains the stream - * @throws IOException if an I/O error occurs + * File-backed constructor. */ private TikaInputStream(Path path) throws IOException { super(new BufferedInputStream(Files.newInputStream(path))); @@ -141,6 +82,7 @@ public class TikaInputStream extends TaggedInputStream { this.tmp = new TemporaryResources(); this.length = Files.size(path); this.suffix = FilenameUtils.getSuffixFromPath(path.getFileName().toString()); + this.cachingStream = null; } private TikaInputStream(Path path, TemporaryResources tmp, long length) throws IOException { @@ -149,250 +91,74 @@ public class TikaInputStream extends TaggedInputStream { this.tmp = tmp; this.length = length; this.suffix = FilenameUtils.getSuffixFromPath(path.getFileName().toString()); + this.cachingStream = null; } /** - * Creates a TikaInputStream instance. This private constructor is used - * by the static factory methods based on the available information. - * - * @param file the file that contains the stream - * @throws FileNotFoundException if the file does not exist - * @deprecated use {@link #TikaInputStream(Path)} - */ - @Deprecated - private TikaInputStream(File file) throws FileNotFoundException { - super(new BufferedInputStream(new FileInputStream(file))); - this.path = file.toPath(); - this.tmp = new TemporaryResources(); - this.length = file.length(); - this.suffix = FilenameUtils.getSuffixFromPath(path.getFileName().toString()); - - } - - /** - * Creates a TikaInputStream instance. This private constructor is used - * by the static factory methods based on the available information. - * <p> - * The given stream needs to be included in the given temporary resource - * collection if the caller wants it also to get closed when the - * {@link #close()} method is invoked. - * - * @param stream <em>buffered</em> stream (must support the mark feature) - * @param tmp tracker for temporary resources associated with this stream - * @param length total length of the stream, or -1 if unknown + * Stream-backed constructor. */ private TikaInputStream(InputStream stream, TemporaryResources tmp, long length, String suffix) { - super(stream); + super(createCachingStream(stream, tmp)); this.path = null; this.tmp = tmp; this.length = length; this.suffix = suffix; + this.cachingStream = (CachingInputStream) in; } - /** - * Checks whether the given stream is a TikaInputStream instance. - * The given stream can be <code>null</code>, in which case the return - * value is <code>false</code>. - * - * @param stream input stream, possibly <code>null</code> - * @return <code>true</code> if the stream is a TikaInputStream instance, - * <code>false</code> otherwise - */ - public static boolean isTikaInputStream(InputStream stream) { - return stream instanceof TikaInputStream; + private static CachingInputStream createCachingStream(InputStream stream, TemporaryResources tmp) { + StreamCache cache = new StreamCache(tmp); + return new CachingInputStream( + stream instanceof BufferedInputStream ? stream : new BufferedInputStream(stream), + cache + ); } - /** - * Casts or wraps the given stream to a TikaInputStream instance. - * This method can be used to access the functionality of this class - * even when given just a normal input stream instance. - * <p> - * The given temporary file provider is used for any temporary files, - * and should be disposed when the returned stream is no longer used. - * <p> - * Use this method instead of the {@link #get(InputStream)} alternative - * when you <em>don't</em> explicitly close the returned stream. The - * recommended access pattern is: - * <pre> - * try (TemporaryResources tmp = new TemporaryResources()) { - * TikaInputStream stream = TikaInputStream.get(..., tmp); - * // process stream but don't close it - * } - * </pre> - * <p> - * The given stream instance will <em>not</em> be closed when the - * {@link TemporaryResources#close()} method is called by the - * try-with-resources statement. The caller is expected to explicitly - * close the original stream when it's no longer used. - * - * @param stream normal input stream - * @return a TikaInputStream instance - * @since Apache Tika 0.10 - */ + // ========== Static Factory Methods ========== + public static TikaInputStream get(InputStream stream, TemporaryResources tmp, Metadata metadata) { if (stream == null) { throw new NullPointerException("The Stream must not be null"); } if (stream instanceof TikaInputStream) { return (TikaInputStream) stream; - } else { - // Make sure that the stream is buffered and that it - // (properly) supports the mark feature - if (!(stream.markSupported())) { - stream = new BufferedInputStream(stream); - } - return new TikaInputStream(stream, tmp, -1, getExtension(metadata)); } + return new TikaInputStream(stream, tmp, -1, getExtension(metadata)); } - /** - * Use this if there is no actual underlying InputStream. It is important - * to set a length so that the zip bomb detector won't be triggered - * in the SecurityHandler. - * <p> - * If your stream has underlying bytes and a length, see {@link #setOpenContainer(Object)} - * - * @param openContainer - * @param length - * @param metadata - * @return - */ - public static TikaInputStream getFromContainer(Object openContainer, long length, Metadata metadata) { - TikaInputStream tis = TikaInputStream.get(new byte[0], metadata); - tis.setOpenContainer(openContainer); - //this overwrites the length that was set in the constructor above - tis.setLength(length); - metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length)); - return tis; - } - - /** - * Casts or wraps the given stream to a TikaInputStream instance. - * This method can be used to access the functionality of this class - * even when given just a normal input stream instance. - * <p> - * Use this method instead of the - * {@link #get(InputStream, TemporaryResources, Metadata)} alternative when you - * <em>do</em> explicitly close the returned stream. The recommended - * access pattern is: - * <pre> - * try (TikaInputStream stream = TikaInputStream.get(...)) { - * // process stream - * } - * </pre> - * <p> - * The given stream instance will be closed along with any other resources - * associated with the returned TikaInputStream instance when the - * {@link #close()} method is called by the try-with-resources statement. - * - * @param stream normal input stream - * @return a TikaInputStream instance - */ public static TikaInputStream get(InputStream stream) { return get(stream, new TemporaryResources(), null); } - /** - * Casts or wraps the given stream to a TikaInputStream instance. - * This method can be used to access the functionality of this class - * even when given just a normal input stream instance. - * <p> - * Use this method instead of the - * {@link #get(InputStream, TemporaryResources, Metadata)} alternative when you - * <em>do</em> explicitly close the returned stream. The recommended - * access pattern is: - * <pre> - * try (TikaInputStream stream = TikaInputStream.get(...)) { - * // process stream - * } - * </pre> - * <p> - * The given stream instance will be closed along with any other resources - * associated with the returned TikaInputStream instance when the - * {@link #close()} method is called by the try-with-resources statement. - * - * @param stream normal input stream - * @return a TikaInputStream instance - */ public static TikaInputStream get(InputStream stream, Metadata metadata) { return get(stream, new TemporaryResources(), metadata); } - /** - * Returns the given stream casts to a TikaInputStream, or - * <code>null</code> if the stream is not a TikaInputStream. - * - * @param stream normal input stream - * @return a TikaInputStream instance - * @since Apache Tika 0.10 - */ public static TikaInputStream cast(InputStream stream) { if (stream instanceof TikaInputStream) { return (TikaInputStream) stream; - } else { - return null; } + return null; } - /** - * Creates a TikaInputStream from the given array of bytes. - * <p> - * Note that you must always explicitly close the returned stream as in - * some cases it may end up writing the given data to a temporary file. - * - * @param data input data - * @return a TikaInputStream instance - */ - public static TikaInputStream get(byte[] data) { + public static TikaInputStream get(byte[] data) throws IOException { return get(data, new Metadata()); } - /** - * Creates a TikaInputStream from the given array of bytes. The length of - * the array is stored as input metadata in the given metadata instance. - * <p> - * Note that you must always explicitly close the returned stream as in - * some cases it may end up writing the given data to a temporary file. - * - * @param data input data - * @param metadata metadata instance - * @return a TikaInputStream instance - */ - public static TikaInputStream get(byte[] data, Metadata metadata) { + public static TikaInputStream get(byte[] data, Metadata metadata) throws IOException { metadata.set(Metadata.CONTENT_LENGTH, Integer.toString(data.length)); - return new TikaInputStream(new UnsynchronizedByteArrayInputStream(data), new TemporaryResources(), - data.length, getExtension(metadata)); + return new TikaInputStream( + UnsynchronizedByteArrayInputStream.builder().setByteArray(data).get(), + new TemporaryResources(), + data.length, + getExtension(metadata) + ); } - /** - * Creates a TikaInputStream from the file at the given path. - * <p> - * Note that you must always explicitly close the returned stream to - * prevent leaking open file handles. - * - * @param path input file - * @return a TikaInputStream instance - * @throws IOException if an I/O error occurs - */ public static TikaInputStream get(Path path) throws IOException { return get(path, new Metadata()); } - /** - * Creates a TikaInputStream from the file at the given path. The file name - * and length are stored as input metadata in the given metadata instance. - * <p> - * If there's an {@link TikaCoreProperties#RESOURCE_NAME_KEY} in the - * metadata object, this will not overwrite that value with the path's name. - * <p> - * Note that you must always explicitly close the returned stream to - * prevent leaking open file handles. - * - * @param path input file - * @param metadata metadata instance - * @return a TikaInputStream instance - * @throws IOException if an I/O error occurs - */ public static TikaInputStream get(Path path, Metadata metadata) throws IOException { if (StringUtils.isBlank(metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY))) { metadata.set(TikaCoreProperties.RESOURCE_NAME_KEY, path.getFileName().toString()); @@ -411,107 +177,19 @@ public class TikaInputStream extends TaggedInputStream { return new TikaInputStream(path, tmp, length); } - /** - * Creates a TikaInputStream from the given file. - * <p> - * Note that you must always explicitly close the returned stream to - * prevent leaking open file handles. - * - * @param file input file - * @return a TikaInputStream instance - * @throws FileNotFoundException if the file does not exist - * @deprecated use {@link #get(Path)}. In Tika 2.0, this will be removed - * or modified to throw an IOException. - */ - @Deprecated - public static TikaInputStream get(File file) throws FileNotFoundException { - return get(file, new Metadata()); - } - - /** - * Creates a TikaInputStream from the given file. The file name and - * length are stored as input metadata in the given metadata instance. - * <p> - * Note that you must always explicitly close the returned stream to - * prevent leaking open file handles. - * - * @param file input file - * @param metadata metadata instance - * @return a TikaInputStream instance - * @throws FileNotFoundException if the file does not exist - * or cannot be opened for reading - * @deprecated use {@link #get(Path, Metadata)}. In Tika 2.0, - * this will be removed or modified to throw an IOException. - */ - @Deprecated - public static TikaInputStream get(File file, Metadata metadata) throws FileNotFoundException { - if (StringUtils.isBlank(metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY))) { - metadata.set(TikaCoreProperties.RESOURCE_NAME_KEY, file.getName()); - } - metadata.set(Metadata.CONTENT_LENGTH, Long.toString(file.length())); - return new TikaInputStream(file); - } - - /** - * Creates a TikaInputStream from a Factory which can create - * fresh {@link InputStream}s for the same resource multiple times. - * <p>This is typically desired when working with {@link Parser}s that - * need to re-read the stream multiple times, where other forms - * of buffering (eg File) are slower than just getting a fresh - * new stream each time. - */ - public static TikaInputStream get(InputStreamFactory factory) throws IOException { - return get(factory, new TemporaryResources()); + public static TikaInputStream get(File file) throws IOException { + return get(file.toPath(), new Metadata()); } - /** - * Creates a TikaInputStream from a Factory which can create - * fresh {@link InputStream}s for the same resource multiple times. - * <p>This is typically desired when working with {@link Parser}s that - * need to re-read the stream multiple times, where other forms - * of buffering (eg File) are slower than just getting a fresh - * new stream each time. - */ - public static TikaInputStream get(InputStreamFactory factory, TemporaryResources tmp) - throws IOException { - TikaInputStream stream = get(factory.getInputStream(), tmp, null); - stream.streamFactory = factory; - return stream; + public static TikaInputStream get(File file, Metadata metadata) throws IOException { + return get(file.toPath(), metadata); } - /** - * Creates a TikaInputStream from the given database BLOB. - * <p> - * Note that the result set containing the BLOB may need to be kept open - * until the returned TikaInputStream has been processed and closed. - * You must also always explicitly close the returned stream as in - * some cases it may end up writing the blob data to a temporary file. - * - * @param blob database BLOB - * @return a TikaInputStream instance - * @throws SQLException if BLOB data can not be accessed - */ - public static TikaInputStream get(Blob blob) throws SQLException { + public static TikaInputStream get(Blob blob) throws SQLException, IOException { return get(blob, new Metadata()); } - /** - * Creates a TikaInputStream from the given database BLOB. The BLOB - * length (if available) is stored as input metadata in the given - * metadata instance. - * <p> - * Note that the result set containing the BLOB may need to be kept open - * until the returned TikaInputStream has been processed and closed. - * You must also always explicitly close the returned stream as in - * some cases it may end up writing the blob data to a temporary file. - * - * @param blob database BLOB - * @param metadata metadata instance - * @return a TikaInputStream instance - * @throws SQLException if BLOB data can not be accessed - */ - public static TikaInputStream get(Blob blob, Metadata metadata) throws SQLException { - + public static TikaInputStream get(Blob blob, Metadata metadata) throws SQLException, IOException { long length = -1; try { length = blob.length(); @@ -519,92 +197,37 @@ public class TikaInputStream extends TaggedInputStream { } catch (SQLException ignore) { } - // Prefer an in-memory buffer for reasonably sized blobs to reduce - // the likelihood of problems caused by long-lived database accesses if (0 <= length && length <= BLOB_SIZE_THRESHOLD) { - // the offset in Blob.getBytes() starts at 1 return get(blob.getBytes(1, (int) length), metadata); } else { - return new TikaInputStream(new BufferedInputStream(blob.getBinaryStream()), - new TemporaryResources(), length, - getExtension(metadata)); - } - } - - private static String getExtension(Metadata metadata) { - if (metadata == null) { - return StringUtils.EMPTY; + return new TikaInputStream( + new BufferedInputStream(blob.getBinaryStream()), + new TemporaryResources(), + length, + getExtension(metadata) + ); } - String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY); - return FilenameUtils.getSuffixFromPath(name); } - /** - * Creates a TikaInputStream from the resource at the given URI. - * <p> - * Note that you must always explicitly close the returned stream as in - * some cases it may end up writing the resource to a temporary file. - * - * @param uri resource URI - * @return a TikaInputStream instance - * @throws IOException if the resource can not be accessed - */ public static TikaInputStream get(URI uri) throws IOException { return get(uri, new Metadata()); } - /** - * Creates a TikaInputStream from the resource at the given URI. The - * available input metadata is stored in the given metadata instance. - * <p> - * Note that you must always explicitly close the returned stream as in - * some cases it may end up writing the resource to a temporary file. - * - * @param uri resource URI - * @param metadata metadata instance - * @return a TikaInputStream instance - * @throws IOException if the resource can not be accessed - */ public static TikaInputStream get(URI uri, Metadata metadata) throws IOException { - // Special handling for file:// URIs if ("file".equalsIgnoreCase(uri.getScheme())) { Path path = Paths.get(uri); if (Files.isRegularFile(path)) { return get(path, metadata); } } - return get(uri.toURL(), metadata); } - /** - * Creates a TikaInputStream from the resource at the given URL. - * <p> - * Note that you must always explicitly close the returned stream as in - * some cases it may end up writing the resource to a temporary file. - * - * @param url resource URL - * @return a TikaInputStream instance - * @throws IOException if the resource can not be accessed - */ public static TikaInputStream get(URL url) throws IOException { return get(url, new Metadata()); } - /** - * Creates a TikaInputStream from the resource at the given URL. The - * available input metadata is stored in the given metadata instance. - * <p> - * Note that you must always explicitly close the returned stream as in - * some cases it may end up writing the resource to a temporary file. - * - * @param url resource URL - * @param metadata metadata instance - * @return a TikaInputStream instance - * @throws IOException if the resource can not be accessed - */ public static TikaInputStream get(URL url, Metadata metadata) throws IOException { - // Special handling for file:// URLs if ("file".equalsIgnoreCase(url.getProtocol())) { try { Path path = Paths.get(url.toURI()); @@ -618,10 +241,10 @@ public class TikaInputStream extends TaggedInputStream { URLConnection connection = url.openConnection(); - String path = url.getPath(); - int slash = path.lastIndexOf('/'); - if (slash + 1 < path.length()) { // works even with -1! - metadata.set(TikaCoreProperties.RESOURCE_NAME_KEY, path.substring(slash + 1)); + String urlPath = url.getPath(); + int slash = urlPath.lastIndexOf('/'); + if (slash + 1 < urlPath.length()) { + metadata.set(TikaCoreProperties.RESOURCE_NAME_KEY, urlPath.substring(slash + 1)); } String type = connection.getContentType(); @@ -639,23 +262,115 @@ public class TikaInputStream extends TaggedInputStream { metadata.set(Metadata.CONTENT_LENGTH, Integer.toString(length)); } - return new TikaInputStream(new BufferedInputStream(connection.getInputStream()), - new TemporaryResources(), length, getExtension(metadata)); + return new TikaInputStream( + new BufferedInputStream(connection.getInputStream()), + new TemporaryResources(), + length, + getExtension(metadata) + ); } - /** - * Fills the given buffer with upcoming bytes from this stream without - * advancing the current stream position. The buffer is filled up unless - * the end of stream is encountered before that. This method will block - * if not enough bytes are immediately available. - * - * @param buffer byte buffer - * @return number of bytes written to the buffer - * @throws IOException if the stream can not be read - */ + public static TikaInputStream getFromContainer(Object openContainer, long length, Metadata metadata) + throws IOException { + TikaInputStream tis = TikaInputStream.get(new byte[0], metadata); + tis.setOpenContainer(openContainer); + tis.setLength(length); + metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length)); + return tis; + } + + private static String getExtension(Metadata metadata) { + if (metadata == null) { + return StringUtils.EMPTY; + } + String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY); + return FilenameUtils.getSuffixFromPath(name); + } + + // ========== InputStream Methods ========== + + @Override + public long skip(long ln) throws IOException { + if (skipBuffer == null) { + skipBuffer = new byte[4096]; + } + long n = IOUtils.skip(in, ln, skipBuffer); + position += n; + return n; + } + + @Override + public void mark(int readlimit) { + super.mark(readlimit); + mark = position; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void reset() throws IOException { + if (mark < 0) { + throw new IOException("Resetting to invalid mark"); + } + + if (path != null) { + // File-backed: close and reopen, skip to mark position + in.close(); + InputStream newStream = Files.newInputStream(path); + tmp.addResource(newStream); + in = new BufferedInputStream(newStream); + + if (mark > 0) { + if (skipBuffer == null) { + skipBuffer = new byte[4096]; + } + IOUtils.skip(in, mark, skipBuffer); + } + } else if (cachingStream != null) { + // Stream-backed: seek within the cache + cachingStream.seekTo(mark); + } else { + throw new IOException("Cannot reset: no cache and no file backing"); + } + + position = mark; + mark = -1; + consecutiveEOFs = 0; + } + + @Override + public void close() throws IOException { + if (closeShieldDepth > 0) { + return; + } + path = null; + mark = -1; + + tmp.addResource(in); + tmp.close(); + } + + @Override + protected void afterRead(int n) throws IOException { + if (n != -1) { + position += n; + consecutiveEOFs = 0; + } else { + consecutiveEOFs++; + if (consecutiveEOFs > MAX_CONSECUTIVE_EOFS) { + throw new IOException("Read too many -1 (EOFs); there could be an infinite loop. " + + "If you think your file is not corrupt, please open an issue on Tika's JIRA"); + } + } + } + + // ========== TikaInputStream-specific Methods ========== + public int peek(byte[] buffer) throws IOException { int n = 0; - mark(buffer.length); int m = read(buffer); @@ -669,31 +384,13 @@ public class TikaInputStream extends TaggedInputStream { } reset(); - return n; } - /** - * Returns the open container object if any, such as a - * POIFS FileSystem in the event of an OLE2 document - * being detected and processed by the OLE2 detector. - * - * @return Open Container for this stream, or <code>null</code> if none - */ public Object getOpenContainer() { return openContainer; } - /** - * Stores the open container object against - * the stream, eg after a Zip contents - * detector has loaded the file to decide - * what it contains. - * <p> - * If there's no undelrying stream, consider {@link #getFromContainer(Object, long, Metadata)} - * because that will avoid potential improper zip bomb exceptions from the SecurityHandler if - * it thinks the length of the stream == 0. - */ public void setOpenContainer(Object container) { openContainer = container; if (container instanceof Closeable) { @@ -701,114 +398,56 @@ public class TikaInputStream extends TaggedInputStream { } } - /** - * - * @param closeable - */ public void addCloseableResource(Closeable closeable) { tmp.addResource(closeable); } - public boolean hasInputStreamFactory() { - return streamFactory != null; - } - - /** - * If the Stream was created from an {@link InputStreamFactory}, - * return that, otherwise <code>null</code>. - */ - public InputStreamFactory getInputStreamFactory() { - return streamFactory; - } - public boolean hasFile() { return path != null; } - - /** - * If the user created this TikaInputStream with a file, - * the original file will be returned. If not, the entire stream - * will be spooled to a temporary file which will be deleted - * upon the close of this TikaInputStream - * - * @return - * @throws IOException - */ public Path getPath() throws IOException { - return getPath(-1); - } - - /** - * @param maxBytes if this is less than 0 and if an underlying file doesn't already exist, - * the full file will be spooled to disk - * @return the original path used in the initialization of this TikaInputStream, - * a temporary file if the stream was shorter than <code>maxBytes</code>, or <code>null</code> - * if the underlying stream was longer than maxBytes. - * @throws IOException - */ - public Path getPath(int maxBytes) throws IOException { if (path != null) { return path; } - if (position > 0) { - throw new IOException("Stream is already being read"); - } else { - Path tmpFile = tmp.createTempFile(suffix); - if (maxBytes > -1) { - try (BoundedInputStream boundedInputStream = new BoundedInputStream(maxBytes, this)) { - boundedInputStream.mark(maxBytes); - try { - Files.copy(boundedInputStream, tmpFile, REPLACE_EXISTING); - if (boundedInputStream.hasHitBound()) { - //tmpFile will be cleaned up when this TikaInputStream is closed - return null; - } - } finally { - boundedInputStream.reset(); - } - } - } else { - // Spool the entire stream into a temporary file - Files.copy(this, tmpFile, REPLACE_EXISTING); - } - //successful so far, set tis' path to tmpFile - path = tmpFile; - // Create a new input stream and make sure it'll get closed - InputStream newStream = Files.newInputStream(path); - tmp.addResource(newStream); + if (cachingStream == null) { + throw new IOException("No caching stream available"); + } - // Replace the spooled stream with the new stream in a way - // that still ends up closing the old stream if or when the - // close() method is called. The closing of the new stream - // is already being handled as noted above. - final InputStream oldStream = in; - in = new BufferedInputStream(newStream) { - @Override - public void close() throws IOException { - oldStream.close(); - } - }; + // Spill to file and switch to file-backed mode + path = cachingStream.spillToFile(); - // Update length to file size. Update position, mark - long sz = Files.size(path); - if (getOpenContainer() != null && sz == 0 && length > -1) { - //don't update size if there's an open container and the sz == 0 - //hope that the length was sent in earlier via getFromContainer - } else { - length = sz; + // Reopen from file at current position + long savedPosition = position; + in.close(); + + InputStream newStream = Files.newInputStream(path); + tmp.addResource(newStream); + in = new BufferedInputStream(newStream); + + // Skip to saved position + if (savedPosition > 0) { + if (skipBuffer == null) { + skipBuffer = new byte[4096]; } - position = 0; - mark = -1; + IOUtils.skip(in, savedPosition, skipBuffer); } + // Update length + long sz = Files.size(path); + if (openContainer != null && sz == 0 && length > -1) { + // Don't update if open container with 0 size + } else { + length = sz; + } + + // No longer using caching stream + cachingStream = null; + return path; } - /** - * @see #getPath() - */ public File getFile() throws IOException { return getPath().toFile(); } @@ -823,114 +462,21 @@ public class TikaInputStream extends TaggedInputStream { return length != -1; } - /** - * Returns the length (in bytes) of this stream. Note that if the length - * was not available when this stream was instantiated, then this method - * will use the {@link #getPath()} method to buffer the entire stream to - * a temporary file in order to calculate the stream length. This case - * will only work if the stream has not yet been consumed. - * - * @return stream length - * @throws IOException if the length can not be determined - */ public long getLength() throws IOException { if (length == -1) { - getPath(); // updates length internally + getPath(); } return length; } - /** - * Returns the current position within the stream. - * - * @return stream position - */ public long getPosition() { return position; } - /** - * This should only be called by the constructor for an open container with a 0 length - * byte inputStream - * - * @param length - */ private void setLength(long length) { this.length = length; } - /** - * This relies on {@link IOUtils#skip(InputStream, long, byte[])} to ensure - * that the alleged bytes skipped were actually skipped. - * - * @param ln the number of bytes to skip - * @return the number of bytes skipped - * @throws IOException if the number of bytes requested to be skipped does not match the - * number of bytes skipped or if there's an IOException during the read. - */ - @Override - public long skip(long ln) throws IOException { - //On TIKA-3092, we found that using the static byte array buffer - //caused problems with multithreading with the FlateInputStream - //from a POIFS document stream - if (skipBuffer == null) { - skipBuffer = new byte[4096]; - } - long n = IOUtils.skip(super.in, ln, skipBuffer); - position += n; - return n; - } - - @Override - public void mark(int readlimit) { - super.mark(readlimit); - mark = position; - } - - @Override - public boolean markSupported() { - return true; - } - - @Override - public void reset() throws IOException { - super.reset(); - position = mark; - mark = -1; - consecutiveEOFs = 0; - } - - @Override - public void close() throws IOException { - if (closeShieldDepth > 0) { - return; - } - path = null; - mark = -1; - - // The close method was explicitly called, so we indeed - // are expected to close the input stream. Handle that - // by adding that stream as a resource to be tracked before - // closing all of them. This way also possible exceptions from - // the close() calls get managed properly. - tmp.addResource(in); - tmp.close(); - } - - @Override - protected void afterRead(int n) throws IOException { - if (n != -1) { - position += n; - } else { - consecutiveEOFs++; - if (consecutiveEOFs > MAX_CONSECUTIVE_EOFS) { - throw new IOException("Read too many -1 (EOFs); there could be an infinite loop." + - "If you think your file is not corrupt, please open an issue on Tika's " + - "JIRA"); - } - } - } - public void setCloseShield() { this.closeShieldDepth++; } @@ -942,6 +488,16 @@ public class TikaInputStream extends TaggedInputStream { public boolean isCloseShield() { return closeShieldDepth > 0; } + + /** + * Rewind the stream to the beginning. + */ + public void rewind() throws IOException { + mark = 0; + reset(); + mark = -1; + } + @Override public String toString() { String str = "TikaInputStream of "; diff --git a/tika-core/src/main/java/org/apache/tika/parser/multiple/AbstractMultipleParser.java b/tika-core/src/main/java/org/apache/tika/parser/multiple/AbstractMultipleParser.java index cc78a55be3..2cf757db2b 100644 --- a/tika-core/src/main/java/org/apache/tika/parser/multiple/AbstractMultipleParser.java +++ b/tika-core/src/main/java/org/apache/tika/parser/multiple/AbstractMultipleParser.java @@ -251,10 +251,6 @@ public abstract class AbstractMultipleParser implements Parser { // Start tracking resources, so we can clean up when done TemporaryResources tmp = new TemporaryResources(); try { - // Ensure we'll be able to re-read safely, buffering to disk if so, - // to permit Parsers 2+ to be able to read the same data - TikaInputStream taggedStream = ParserUtils.ensureStreamReReadable(tis, tmp, originalMetadata); - for (Parser p : parsers) { // Get a new handler for this parser, if we can // If not, the user will get text from every parser @@ -275,7 +271,7 @@ public abstract class AbstractMultipleParser implements Parser { // Process if possible Exception failure = null; try { - p.parse(taggedStream, handler, metadata, context); + p.parse(tis, handler, metadata, context); } catch (Exception e) { // Record the failure such that it can't get lost / overwritten recordParserFailure(p, e, originalMetadata); @@ -309,7 +305,7 @@ public abstract class AbstractMultipleParser implements Parser { // Prepare for the next parser, if present lastMetadata = cloneMetadata(metadata); - taggedStream = ParserUtils.streamResetForReRead(taggedStream, tmp); + tis.rewind(); } } finally { tmp.dispose(); diff --git a/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java b/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java index 802902fb73..552e418859 100644 --- a/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java +++ b/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java @@ -18,11 +18,8 @@ package org.apache.tika.utils; import static org.apache.tika.metadata.TikaCoreProperties.EMBEDDED_EXCEPTION; -import java.io.IOException; import java.util.Arrays; -import org.apache.tika.io.TemporaryResources; -import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.Property; import org.apache.tika.metadata.TikaCoreProperties; @@ -105,44 +102,4 @@ public class ParserUtils { metadata.add(EMBEDDED_EXCEPTION, trace); metadata.add(EMBEDDED_PARSER, getParserClassname(parser)); } - - /** - * Ensures that the Stream will be able to be re-read, by buffering to - * a temporary file if required. - * Streams that are automatically OK include {@link TikaInputStream}s - * created from Files or InputStreamFactories. - */ - public static TikaInputStream ensureStreamReReadable(TikaInputStream stream, TemporaryResources tmp, - Metadata metadata) - throws IOException { - // If it's factory based, it's ok - if (stream.getInputStreamFactory() != null) { - return stream; - } - - // Ensure it's file based - stream.getFile(); - // Prepare for future re-reads - stream.mark(-1); - return stream; - } - - /** - * Resets the given {@link TikaInputStream} (checked by - * {@link #ensureStreamReReadable(TikaInputStream, TemporaryResources, Metadata)}) - * so that it can be re-read again. - */ - public static TikaInputStream streamResetForReRead(TikaInputStream stream, TemporaryResources tmp) - throws IOException { - // Factory based? - if (stream.getInputStreamFactory() != null) { - // Just get a fresh one each time from the factory - return TikaInputStream.get(stream.getInputStreamFactory(), tmp); - } - - // File based, reset stream to beginning of File - stream.reset(); - stream.mark(-1); - return stream; - } } diff --git a/tika-core/src/main/java/org/apache/tika/utils/RereadableInputStream.java b/tika-core/src/main/java/org/apache/tika/utils/RereadableInputStream.java deleted file mode 100644 index e2fdba1ab9..0000000000 --- a/tika-core/src/main/java/org/apache/tika/utils/RereadableInputStream.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.utils; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; - -import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; - -/** - * Wraps an input stream, reading it only once, but making it available - * for rereading an arbitrary number of times. The stream's bytes are - * stored in memory up to a user specified maximum, and then stored in a - * temporary file which is deleted when this class's close() method is called. - */ -public class RereadableInputStream extends InputStream { - - /** - * Default value for buffer size = 500M - */ - private static final int DEFAULT_MAX_BYTES_IN_MEMORY = 512 * 1024 * 1024; - - - /** - * Input stream originally passed to the constructor. - */ - private final InputStream originalInputStream; - - /** - * The inputStream currently being used by this object to read contents; - * may be the original stream passed in, or a stream that reads - * the saved copy from a memory buffer or file. - */ - private InputStream inputStream; - - /** - * Maximum number of bytes that can be stored in memory before - * storage will be moved to a temporary file. - */ - private final int maxBytesInMemory; - - /** - * Whether or not we are currently reading from the byte buffer in memory - * Bytes are read until we've exhausted the buffered bytes and then we proceed to read from - * the original input stream. If the numbers of bytes read from the original stream - * eventually exceed maxBytesInMemory, then we'll switch to reading from a file. - */ - private boolean readingFromBuffer; - - - /** - * The buffer used to store the stream's content; this storage is moved - * to a file when the stored data's size exceeds maxBytesInMemory. - * Set to null once we start writing to a file. - */ - private byte[] byteBuffer; - - /** - * The current pointer when reading from memory - */ - private int bufferPointer; - - /** - * Maximum size of the buffer that was written in previous pass(s) - */ - private int bufferHighWaterMark; - - /** - * File used to store the stream's contents; is null until the stored - * content's size exceeds maxBytesInMemory. - */ - private File storeFile; - - /** - * Specifies whether the stream has been closed - */ - private boolean closed; - - /** - * OutputStream used to save the content of the input stream in a - * temporary file. - */ - private OutputStream storeOutputStream; - - - /** - * Specifies whether or not to close the original input stream - * when close() is called. Defaults to true. - */ - private final boolean closeOriginalStreamOnClose; - - - /** - * Creates a rereadable input stream with defaults of 512*1024*1024 bytes (500M) for - * maxBytesInMemory and both readToEndOfStreamOnFirstRewind and closeOriginalStreamOnClose - * set to true - * - * @param inputStream stream containing the source of data - */ - public RereadableInputStream(InputStream inputStream) { - this(inputStream, DEFAULT_MAX_BYTES_IN_MEMORY, true); - } - - /** - * Creates a rereadable input stream defaulting to 512*1024*1024 bytes (500M) for - * maxBytesInMemory - * - * @param inputStream stream containing the source of data - */ - public RereadableInputStream(InputStream inputStream, boolean closeOriginalStreamOnClose) { - this(inputStream, DEFAULT_MAX_BYTES_IN_MEMORY, closeOriginalStreamOnClose); - } - - /** - * Creates a rereadable input stream with closeOriginalStreamOnClose set to true - * - * @param inputStream stream containing the source of data - * @param maxBytesInMemory maximum number of bytes to use to store - * the stream's contents in memory before switching to disk; note that - * the instance will preallocate a byte array whose size is - * maxBytesInMemory. This byte array will be made available for - * garbage collection (i.e. its reference set to null) when the - * content size exceeds the array's size, when close() is called, or - * when there are no more references to the instance. - */ - public RereadableInputStream(InputStream inputStream, int maxBytesInMemory) { - this(inputStream, maxBytesInMemory, true); - } - - /** - * Creates a rereadable input stream. - * - * @param inputStream stream containing the source of data - * @param maxBytesInMemory maximum number of bytes to use to store - * the stream's contents in memory before switching to disk; note that - * the instance will preallocate a byte array whose size is - * maxBytesInMemory. This byte array will be made available for - * garbage collection (i.e. its reference set to null) when the - * content size exceeds the array's size, when close() is called, or - * when there are no more references to the instance. - */ - public RereadableInputStream(InputStream inputStream, int maxBytesInMemory, - boolean closeOriginalStreamOnClose) { - this.inputStream = inputStream; - this.originalInputStream = inputStream; - this.maxBytesInMemory = maxBytesInMemory; - byteBuffer = new byte[maxBytesInMemory]; - this.closeOriginalStreamOnClose = closeOriginalStreamOnClose; - } - - /** - * Reads a byte from the stream, saving it in the store if it is being - * read from the original stream. Implements the abstract - * InputStream.read(). - * - * @return the read byte, or -1 on end of stream. - * @throws IOException - */ - public int read() throws IOException { - if (closed) { - throw new IOException("Stream is already closed"); - } - - int inputByte = inputStream.read(); - if (inputByte == -1 && inputStream != originalInputStream) { - // If we got EOF reading from buffer or file, switch to the original stream and get - // the next byte from there instead - if (readingFromBuffer) { - readingFromBuffer = false; - inputStream.close(); // Close the input byte stream - } else { - inputStream.close(); // Close the input file stream - // start appending to the file - storeOutputStream = new BufferedOutputStream(new FileOutputStream(storeFile, true)); - } - // The original stream is now the current stream - inputStream = originalInputStream; - inputByte = inputStream.read(); - } - - if (inputByte != -1 && inputStream == originalInputStream) { - // If not EOF and reading from original stream, save the bytes we read - saveByte(inputByte); - } - - return inputByte; - } - - /** - * Saves the bytes read from the original stream to buffer or file - */ - private void saveByte(int inputByte) throws IOException { - if (byteBuffer != null) { - if (bufferPointer == maxBytesInMemory) { - // Need to switch to file - storeFile = Files.createTempFile("TIKA_streamstore_", ".tmp").toFile(); - storeOutputStream = new BufferedOutputStream(new FileOutputStream(storeFile)); - // Save what we have so far in buffer - storeOutputStream.write(byteBuffer, 0, bufferPointer); - // Write the new byte - storeOutputStream.write(inputByte); - byteBuffer = null; // release for garbage collection - } else { - // Continue writing to buffer - byteBuffer[bufferPointer++] = (byte) inputByte; - } - } else { - storeOutputStream.write(inputByte); - } - } - - /** - * "Rewinds" the stream to the beginning for rereading. - * - * @throws IOException - */ - public void rewind() throws IOException { - if (closed) { - throw new IOException("Stream is already closed"); - } - - if (storeOutputStream != null) { - storeOutputStream.close(); - storeOutputStream = null; - } - - // Close the byte input stream or file input stream - if (inputStream != originalInputStream) { - inputStream.close(); - } - - bufferHighWaterMark = Math.max(bufferPointer, bufferHighWaterMark); - bufferPointer = bufferHighWaterMark; - - if (bufferHighWaterMark > 0) { - // If we have a buffer, then we'll read from it - if (byteBuffer != null) { - readingFromBuffer = true; - inputStream = UnsynchronizedByteArrayInputStream.builder(). - setByteArray(byteBuffer).setOffset(0).setLength(bufferHighWaterMark).get(); - } else { - // No buffer, which means we've switched to a file - inputStream = new BufferedInputStream(new FileInputStream(storeFile)); - } - } else { - inputStream = originalInputStream; - } - } - - /** - * Closes the input stream currently used for reading (may either be - * the original stream or a memory or file stream after the first pass). - * - * @throws IOException - */ - private void closeStream() throws IOException { - if (originalInputStream != inputStream) { - // Close the byte input stream or file input stream, if either is the current one - inputStream.close(); - } - - if (closeOriginalStreamOnClose) { - originalInputStream.close(); - } - } - - /** - * Closes the input stream and removes the temporary file if one was - * created. - * - * @throws IOException - */ - public void close() throws IOException { - closeStream(); - - if (storeOutputStream != null) { - storeOutputStream.close(); - storeOutputStream = null; - } - - super.close(); - if (storeFile != null) { - storeFile.delete(); - } - closed = true; - } -} diff --git a/tika-core/src/test/java/org/apache/tika/TestRereadableInputStream.java b/tika-core/src/test/java/org/apache/tika/TestRereadableInputStream.java deleted file mode 100644 index 05fdb53f4d..0000000000 --- a/tika-core/src/test/java/org/apache/tika/TestRereadableInputStream.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import org.apache.tika.utils.RereadableInputStream; - -public class TestRereadableInputStream { - - private final int DEFAULT_TEST_SIZE = 3; - - private final int MEMORY_THRESHOLD = 10; - - private final int NUM_PASSES = 4; - - // This size of data keeps us in memory - private final int TEST_SIZE_MEMORY = 7; - - // This size of data exceeds memory threshold and gets us in a file - private final int TEST_SIZE_FILE = 15; - - // This size of data exactly equals memory threshold - private final int TEST_SIZE_MAX = MEMORY_THRESHOLD; - - @TempDir - private Path tempDir; - - @Test - public void testInMemory() throws IOException { - readEntireStream((TEST_SIZE_MEMORY)); - } - -// @Test -// public void testInFile() throws IOException { -// readData(TEST_SIZE_FILE); -// } -// -// @Test -// public void testMemoryThreshold() throws IOException { -// readData(TEST_SIZE_MAX); -// } -// -// @Test -// public void testInMemory2() throws IOException { -// readData2((TEST_SIZE_MEMORY)); -// } -// -// @Test -// public void testInFile2() throws IOException { -// readData2(TEST_SIZE_FILE); -// } - - @Test - public void testMemoryThreshold2() throws IOException { - readPartialStream(TEST_SIZE_MAX); - } - - /** - * Read entire stream of various sizes - */ - private void readEntireStream(int testSize) throws IOException { - InputStream is = createTestInputStream(testSize); - try (RereadableInputStream ris = new RereadableInputStream(is, MEMORY_THRESHOLD, true)) { - for (int pass = 0; pass < NUM_PASSES; pass++) { - for (int byteNum = 0; byteNum < testSize; byteNum++) { - int byteRead = ris.read(); - assertEquals(byteNum, byteRead, - "Pass = " + pass + ", byte num should be " + byteNum + " but is " + - byteRead + "."); - } - int eof = ris.read(); - assertEquals(-1, eof, - "Pass = " + pass + ", byte num should be " + -1 + " but is " + eof + "."); - ris.rewind(); - } - } - } - - /** - * Read increasingly more of the stream, but not all, with each pass before rewinding to - * make sure we pick up at the correct point - */ - private void readPartialStream(int testSize) throws IOException { - InputStream is = createTestInputStream(20); - try (RereadableInputStream ris = new RereadableInputStream(is, MEMORY_THRESHOLD, true)) { - - int iterations = testSize; - for (int pass = 0; pass < NUM_PASSES; pass++) { - for (int byteNum = 0; byteNum < iterations; byteNum++) { - int byteRead = ris.read(); - assertEquals(byteNum, byteRead, - "Pass = " + pass + ", byte num should be " + byteNum + " but is " + byteRead + "."); - } - ris.rewind(); - iterations++; - } - } - } - - - @Test - public void testRewind() throws IOException { - InputStream is = createTestInputStream(DEFAULT_TEST_SIZE); - try (RereadableInputStream ris = new RereadableInputStream(is, MEMORY_THRESHOLD, true)) { - ris.rewind(); // rewind before we've done anything - for (int byteNum = 0; byteNum < 1; byteNum++) { - int byteRead = ris.read(); - assertEquals(byteNum, byteRead, "Byte num should be " + byteNum + " but is " + byteRead + "."); - } - } - } - - - private TestInputStream createTestInputStream(int testSize) throws IOException { - return new TestInputStream( - new BufferedInputStream(Files.newInputStream(createTestFile(testSize)))); - } - - private Path createTestFile(int testSize) throws IOException { - Path testfile = Files.createTempFile(tempDir, "TIKA_ris_test", ".tmp"); - try (OutputStream fos = Files.newOutputStream(testfile)) { - for (int i = 0; i < testSize; i++) { - fos.write(i); - } - } - return testfile; - } - - @Test - public void testCloseBehavior() throws IOException { - doACloseBehaviorTest(true); - doACloseBehaviorTest(false); - } - - private void doACloseBehaviorTest(boolean wantToClose) throws IOException { - - TestInputStream tis = createTestInputStream(DEFAULT_TEST_SIZE); - RereadableInputStream ris = new RereadableInputStream(tis, MEMORY_THRESHOLD, wantToClose); - ris.close(); - assertEquals(wantToClose, tis.isClosed()); - - if (!tis.isClosed()) { - tis.close(); - } - } - - @Test - public void doReadAfterCloseTest() throws IOException { - TestInputStream tis = createTestInputStream(DEFAULT_TEST_SIZE); - RereadableInputStream ris = new RereadableInputStream(tis, DEFAULT_TEST_SIZE); - ris.close(); - assertThrows(IOException.class, () -> { - ris.read(); - }); - } - - - /** - * Adds isClosed() to a BufferedInputStream. - */ - static class TestInputStream extends BufferedInputStream { - - private boolean closed; - - public TestInputStream(InputStream inputStream) { - super(inputStream); - } - - public void close() throws IOException { - super.close(); - closed = true; - } - - public boolean isClosed() { - return closed; - } - } -} diff --git a/tika-core/src/test/java/org/apache/tika/io/ReadOnceTikaInputStreamTest.java b/tika-core/src/test/java/org/apache/tika/io/ReadOnceTikaInputStreamTest.java new file mode 100644 index 0000000000..1e0ab72c9f --- /dev/null +++ b/tika-core/src/test/java/org/apache/tika/io/ReadOnceTikaInputStreamTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.io; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import org.junit.jupiter.api.Test; + +public class ReadOnceTikaInputStreamTest { + + private static byte[] bytes(String s) { + return s.getBytes(UTF_8); + } + + private static String str(byte[] b) { + return new String(b, UTF_8); + } + + @Test + public void testBasicRead() throws IOException { + byte[] data = bytes("Hello, World!"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + byte[] buffer = new byte[data.length]; + int read = tis.read(buffer); + assertEquals(data.length, read); + assertArrayEquals(data, buffer); + } + } + + @Test + public void testPosition() throws IOException { + byte[] data = bytes("ABCDEFGHIJ"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertEquals(0, tis.getPosition()); + + tis.read(); + assertEquals(1, tis.getPosition()); + + tis.read(new byte[3]); + assertEquals(4, tis.getPosition()); + + tis.skip(2); + assertEquals(6, tis.getPosition()); + } + } + + @Test + public void testMarkResetWithinBuffer() throws IOException { + byte[] data = bytes("Hello, World!"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + // Mark at position 0 + tis.mark(100); + + // Read 5 bytes + byte[] buf = new byte[5]; + tis.read(buf); + assertEquals("Hello", str(buf)); + assertEquals(5, tis.getPosition()); + + // Reset to 0 + tis.reset(); + assertEquals(0, tis.getPosition()); + + // Read again + buf = new byte[5]; + tis.read(buf); + assertEquals("Hello", str(buf)); + } + } + + @Test + public void testMarkResetAtNonZeroPosition() throws IOException { + byte[] data = bytes("ABCDEFGHIJ"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + // Read 3 bytes + byte[] buf = new byte[3]; + tis.read(buf); + assertEquals("ABC", str(buf)); + + // Mark at position 3 + tis.mark(100); + + // Read 4 more bytes + buf = new byte[4]; + tis.read(buf); + assertEquals("DEFG", str(buf)); + + // Reset to position 3 + tis.reset(); + assertEquals(3, tis.getPosition()); + + // Read again from position 3 + buf = new byte[4]; + tis.read(buf); + assertEquals("DEFG", str(buf)); + } + } + + @Test + public void testPeek() throws IOException { + byte[] data = bytes("Hello, World!"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + byte[] peekBuffer = new byte[5]; + int peeked = tis.peek(peekBuffer); + assertEquals(5, peeked); + assertEquals("Hello", str(peekBuffer)); + + // Position should still be 0 + assertEquals(0, tis.getPosition()); + + // Now read normally + byte[] readBuffer = new byte[5]; + tis.read(readBuffer); + assertEquals("Hello", str(readBuffer)); + assertEquals(5, tis.getPosition()); + } + } + + @Test + public void testPeekExceedsBuffer() throws IOException { + byte[] data = new byte[100]; + // Use small buffer size + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get( + new ByteArrayInputStream(data), 50)) { + byte[] peekBuffer = new byte[100]; + // peek() calls mark() which throws IllegalArgumentException if readlimit > bufferSize + assertThrows(IllegalArgumentException.class, () -> tis.peek(peekBuffer)); + } + } + + @Test + public void testMarkExceedsBufferThrows() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get( + new ByteArrayInputStream(data), 50)) { + // mark with readlimit > bufferSize should throw + assertThrows(IllegalArgumentException.class, () -> tis.mark(100)); + } + } + + @Test + public void testAddCloseableResourceThrows() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertThrows(UnsupportedOperationException.class, + () -> tis.addCloseableResource(new ByteArrayInputStream(new byte[0]))); + } + } + + @Test + public void testHasFileReturnsFalse() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertFalse(tis.hasFile()); + } + } + + @Test + public void testGetPathReturnsNull() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertNull(tis.getPath()); + } + } + + @Test + public void testGetFileReturnsNull() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertNull(tis.getFile()); + } + } + + @Test + public void testRewindThrows() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertThrows(UnsupportedOperationException.class, tis::rewind); + } + } + + @Test + public void testCloseShield() throws IOException { + byte[] data = bytes("Hello"); + ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data); + + assertFalse(tis.isCloseShield()); + tis.setCloseShield(); + assertTrue(tis.isCloseShield()); + + // Close should be ignored + tis.close(); + + // Stream should still be readable + byte[] buf = new byte[5]; + tis.read(buf); + assertEquals("Hello", str(buf)); + + tis.removeCloseShield(); + assertFalse(tis.isCloseShield()); + tis.close(); + } + + @Test + public void testLength() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertTrue(tis.hasLength()); + assertEquals(5, tis.getLength()); + } + } + + @Test + public void testLengthUnknown() throws IOException { + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get( + new ByteArrayInputStream(bytes("Hello")))) { + assertFalse(tis.hasLength()); + assertEquals(-1, tis.getLength()); + } + } + + @Test + public void testOpenContainer() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertEquals(null, tis.getOpenContainer()); + + Object container = new Object(); + tis.setOpenContainer(container); + assertEquals(container, tis.getOpenContainer()); + } + } + + @Test + public void testBufferSize() throws IOException { + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get( + new ByteArrayInputStream(bytes("Hello")), 1024)) { + assertEquals(1024, tis.getBufferSize()); + } + } + + @Test + public void testMarkSupported() throws IOException { + byte[] data = bytes("Hello"); + try (ReadOnceTikaInputStream tis = ReadOnceTikaInputStream.get(data)) { + assertTrue(tis.markSupported()); + } + } +} diff --git a/tika-core/src/test/java/org/apache/tika/io/TikaInputStreamTest.java b/tika-core/src/test/java/org/apache/tika/io/TikaInputStreamTest.java index 4e53e0fddc..f86c3261d7 100644 --- a/tika-core/src/test/java/org/apache/tika/io/TikaInputStreamTest.java +++ b/tika-core/src/test/java/org/apache/tika/io/TikaInputStreamTest.java @@ -17,20 +17,29 @@ package org.apache.tika.io; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Locale; +import java.util.Random; import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -48,7 +57,6 @@ public class TikaInputStreamTest { TikaInputStream tis = TikaInputStream.get(path); assertTrue(tis.hasFile()); assertNull(tis.getOpenContainer()); - assertNull(tis.getInputStreamFactory()); assertEquals(path, TikaInputStream.get(tis).getPath(), "The file returned by the getFile() method should" + @@ -71,13 +79,11 @@ public class TikaInputStreamTest { TikaInputStream tis = TikaInputStream.get(input); assertFalse(tis.hasFile()); assertNull(tis.getOpenContainer()); - assertNull(tis.getInputStreamFactory()); Path file = TikaInputStream.get(tis).getPath(); assertTrue(file != null && Files.isRegularFile(file)); assertTrue(tis.hasFile()); assertNull(tis.getOpenContainer()); - assertNull(tis.getInputStreamFactory()); assertEquals("Hello, World!", readFile(file), "The contents of the file returned by the getFile method" + @@ -92,19 +98,6 @@ public class TikaInputStreamTest { "The close() method must remove the temporary file created by a TikaInputStream"); } - @Test - public void testInputStreamFactoryBased() throws IOException { - TikaInputStream tis = TikaInputStream.get(() -> IOUtils.toInputStream("Hello, World!", UTF_8)); - assertFalse(tis.hasFile()); - assertNull(tis.getOpenContainer()); - assertNotNull(tis.getInputStreamFactory()); - - assertEquals("Hello, World!", readStream(tis), - "The contents of the TikaInputStream should not get modified" + - " by reading the file first"); - tis.close(); - } - private Path createTempFile(String data) throws IOException { Path file = Files.createTempFile(tempDir, "tika-", ".tmp"); Files.write(file, data.getBytes(UTF_8)); @@ -129,4 +122,541 @@ public class TikaInputStreamTest { metadata.get(Metadata.CONTENT_LENGTH)); } + // ========== New Caching Tests ========== + + @Test + public void testMarkReset() throws IOException { + byte[] data = bytes("Hello, World!"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + byte[] first = new byte[5]; + tis.read(first); + assertEquals("Hello", str(first)); + + tis.mark(100); + + byte[] next = new byte[2]; + tis.read(next); + assertEquals(", ", str(next)); + + tis.reset(); + + byte[] again = new byte[2]; + tis.read(again); + assertEquals(", ", str(again)); + } + } + + @Test + public void testMarkResetAtZero() throws IOException { + byte[] data = bytes("Hello, World!"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + tis.mark(100); + + byte[] buffer = new byte[data.length]; + tis.read(buffer); + + tis.reset(); + assertEquals(0, tis.getPosition()); + + byte[] again = new byte[data.length]; + tis.read(again); + assertArrayEquals(data, again); + } + } + + @Test + public void testMultipleMarkReset() throws IOException { + byte[] data = bytes("ABCDEFGHIJ"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + tis.mark(100); + byte[] buf = new byte[3]; + tis.read(buf); + assertEquals("ABC", str(buf)); + + tis.reset(); + tis.mark(100); + buf = new byte[5]; + tis.read(buf); + assertEquals("ABCDE", str(buf)); + + tis.mark(100); + buf = new byte[3]; + tis.read(buf); + assertEquals("FGH", str(buf)); + + tis.reset(); + buf = new byte[3]; + tis.read(buf); + assertEquals("FGH", str(buf)); + } + } + + @Test + public void testRewind() throws IOException { + byte[] data = bytes("Hello, World!"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + byte[] buffer = new byte[data.length]; + tis.read(buffer); + assertEquals(data.length, tis.getPosition()); + + tis.rewind(); + assertEquals(0, tis.getPosition()); + + byte[] again = new byte[data.length]; + tis.read(again); + assertArrayEquals(data, again); + } + } + + @Test + public void testGetPathPreservesPosition() throws IOException { + byte[] data = bytes("Hello, World!"); + try (TikaInputStream tis = TikaInputStream.get(new ByteArrayInputStream(data))) { + byte[] buf = new byte[5]; + tis.read(buf); + assertEquals(5, tis.getPosition()); + + Path path = tis.getPath(); + assertNotNull(path); + assertEquals(5, tis.getPosition()); + + buf = new byte[2]; + tis.read(buf); + assertEquals(", ", str(buf)); + } + } + + @Test + public void testFileBackedMarkReset() throws IOException { + Path tempFile = createTempFile("ABCDEFGHIJ"); + + try (TikaInputStream tis = TikaInputStream.get(tempFile)) { + byte[] buf = new byte[3]; + tis.read(buf); + assertEquals("ABC", str(buf)); + + tis.mark(100); + + buf = new byte[4]; + tis.read(buf); + assertEquals("DEFG", str(buf)); + + tis.reset(); + assertEquals(3, tis.getPosition()); + + buf = new byte[4]; + tis.read(buf); + assertEquals("DEFG", str(buf)); + } + } + + @Test + public void testSkip() throws IOException { + byte[] data = bytes("ABCDEFGHIJ"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + tis.skip(3); + assertEquals(3, tis.getPosition()); + + byte[] buf = new byte[4]; + tis.read(buf); + assertEquals("DEFG", str(buf)); + } + } + + @Test + public void testLargeStreamSpillsToFile() throws IOException { + byte[] data = new byte[2 * 1024 * 1024]; // 2MB + new Random(42).nextBytes(data); + + try (TikaInputStream tis = TikaInputStream.get(new ByteArrayInputStream(data))) { + byte[] buffer = new byte[data.length]; + int totalRead = 0; + int n; + while ((n = tis.read(buffer, totalRead, buffer.length - totalRead)) != -1) { + totalRead += n; + if (totalRead >= buffer.length) { + break; + } + } + assertEquals(data.length, totalRead); + + tis.rewind(); + assertEquals(0, tis.getPosition()); + + byte[] again = new byte[data.length]; + totalRead = 0; + while ((n = tis.read(again, totalRead, again.length - totalRead)) != -1) { + totalRead += n; + if (totalRead >= again.length) { + break; + } + } + assertArrayEquals(data, again); + } + } + + @Test + public void testResetWithoutMark() throws IOException { + byte[] data = bytes("Hello"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + tis.read(); + assertThrows(IOException.class, tis::reset); + } + } + + @Test + public void testPeek() throws IOException { + byte[] data = bytes("Hello, World!"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + byte[] peekBuffer = new byte[5]; + int peeked = tis.peek(peekBuffer); + assertEquals(5, peeked); + assertEquals("Hello", str(peekBuffer)); + + assertEquals(0, tis.getPosition()); + + byte[] readBuffer = new byte[5]; + tis.read(readBuffer); + assertEquals("Hello", str(readBuffer)); + assertEquals(5, tis.getPosition()); + } + } + + @Test + public void testPosition() throws IOException { + byte[] data = bytes("ABCDEFGHIJ"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + assertEquals(0, tis.getPosition()); + + tis.read(); + assertEquals(1, tis.getPosition()); + + tis.read(new byte[3]); + assertEquals(4, tis.getPosition()); + + tis.skip(2); + assertEquals(6, tis.getPosition()); + } + } + + @Test + public void testLength() throws IOException { + byte[] data = bytes("Hello"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + assertTrue(tis.hasLength()); + assertEquals(5, tis.getLength()); + } + } + + @Test + public void testCloseShield() throws IOException { + byte[] data = bytes("Hello"); + TikaInputStream tis = TikaInputStream.get(data); + + assertFalse(tis.isCloseShield()); + tis.setCloseShield(); + assertTrue(tis.isCloseShield()); + + tis.close(); + + byte[] buf = new byte[5]; + tis.read(buf); + assertEquals("Hello", str(buf)); + + tis.removeCloseShield(); + assertFalse(tis.isCloseShield()); + tis.close(); + } + + // ========== Randomized Tests ========== + + @Test + public void testRandomizedStreamBackedZeroLength() throws Exception { + runRandomizedTest(0, false); + } + + @Test + public void testRandomizedStreamBackedSmall() throws Exception { + runRandomizedTest(100, false); + } + + @Test + public void testRandomizedStreamBackedMedium() throws Exception { + runRandomizedTest(8192, false); + } + + @Test + public void testRandomizedStreamBackedLarge() throws Exception { + runRandomizedTest(100_000, false); + } + + @Test + public void testRandomizedStreamBackedVeryLarge() throws Exception { + runRandomizedTest(2 * 1024 * 1024, false); + } + + @Test + public void testRandomizedFileBackedZeroLength() throws Exception { + runRandomizedTest(0, true); + } + + @Test + public void testRandomizedFileBackedSmall() throws Exception { + runRandomizedTest(100, true); + } + + @Test + public void testRandomizedFileBackedMedium() throws Exception { + runRandomizedTest(8192, true); + } + + @Test + public void testRandomizedFileBackedLarge() throws Exception { + runRandomizedTest(100_000, true); + } + + @RepeatedTest(10) + public void testRandomizedOperationsStreamBacked() throws Exception { + Random sizeRandom = new Random(); + int[] sizes = {0, 50, 1000, 8192, 50_000, 1024 * 1024 + 100}; + int size = sizes[sizeRandom.nextInt(sizes.length)]; + runRandomizedOperationsTest(size, false); + } + + @RepeatedTest(10) + public void testRandomizedOperationsFileBacked() throws Exception { + Random sizeRandom = new Random(); + int[] sizes = {0, 50, 1000, 8192, 50_000}; + int size = sizes[sizeRandom.nextInt(sizes.length)]; + runRandomizedOperationsTest(size, true); + } + + private void runRandomizedTest(int size, boolean fileBacked) throws Exception { + long seed = System.currentTimeMillis(); + Random random = new Random(seed); + + byte[] data = new byte[size]; + if (size > 0) { + random.nextBytes(data); + } + String expectedDigest = computeDigest(data); + + try (TikaInputStream tis = createTikaInputStream(data, fileBacked)) { + byte[] readData = readAllBytes(tis); + String actualDigest = computeDigest(readData); + assertEquals(expectedDigest, actualDigest, + "Digest mismatch for size=" + size + ", fileBacked=" + fileBacked + ", seed=" + seed); + assertEquals(size, readData.length); + } + + try (TikaInputStream tis = createTikaInputStream(data, fileBacked)) { + byte[] readData = readAllBytes(tis); + tis.rewind(); + byte[] rereadData = readAllBytes(tis); + assertArrayEquals(readData, rereadData, + "Data mismatch after rewind for size=" + size + ", fileBacked=" + fileBacked); + } + } + + private void runRandomizedOperationsTest(int size, boolean fileBacked) throws Exception { + long seed = System.currentTimeMillis(); + Random random = new Random(seed); + + byte[] data = new byte[size]; + if (size > 0) { + random.nextBytes(data); + } + + try (TikaInputStream tis = createTikaInputStream(data, fileBacked)) { + int position = 0; + int markPosition = -1; + int numOps = random.nextInt(50) + 10; + + for (int op = 0; op < numOps; op++) { + int operation = random.nextInt(5); + + switch (operation) { + case 0: + if (position < size) { + int expectedByte = data[position] & 0xFF; + int actualByte = tis.read(); + assertEquals(expectedByte, actualByte, + "Single byte read mismatch at position " + position + + ", size=" + size + ", seed=" + seed); + position++; + } else { + assertEquals(-1, tis.read()); + } + break; + + case 1: + int readLen = random.nextInt(Math.min(1000, size + 100) + 1); + byte[] buffer = new byte[readLen]; + int bytesRead = tis.read(buffer); + if (position >= size) { + assertTrue(bytesRead <= 0, + "Expected EOF, got " + bytesRead + " bytes"); + } else { + assertTrue(bytesRead > 0, + "Expected data, got " + bytesRead); + for (int i = 0; i < bytesRead; i++) { + assertEquals(data[position + i], buffer[i], + "Bulk read mismatch at offset " + i); + } + position += bytesRead; + } + break; + + case 2: + long skipAmount = random.nextInt(size + 100); + long skipped = tis.skip(skipAmount); + assertTrue(skipped >= 0 && skipped <= skipAmount); + position += (int) skipped; + if (position > size) { + position = size; + } + break; + + case 3: + int readLimit = random.nextInt(size + 100) + 1; + tis.mark(readLimit); + markPosition = position; + break; + + case 4: + if (markPosition >= 0) { + tis.reset(); + position = markPosition; + markPosition = -1; + } + break; + + default: + break; + } + assertEquals(position, tis.getPosition(), + "Position mismatch after operation " + operation + ", seed=" + seed); + } + + tis.rewind(); + assertEquals(0, tis.getPosition()); + byte[] finalRead = readAllBytes(tis); + String expectedDigest = computeDigest(data); + String actualDigest = computeDigest(finalRead); + assertEquals(expectedDigest, actualDigest, + "Final digest mismatch after operations, size=" + size + ", seed=" + seed); + } + } + + @Test + public void testMarkBeyondStreamLength() throws Exception { + byte[] data = bytes("Short"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + tis.mark(1000); + byte[] buf = readAllBytes(tis); + assertEquals("Short", str(buf)); + tis.reset(); + assertEquals(0, tis.getPosition()); + buf = readAllBytes(tis); + assertEquals("Short", str(buf)); + } + } + + @Test + public void testSkipBeyondStreamLength() throws Exception { + byte[] data = bytes("Short"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + long skipped = tis.skip(1000); + assertEquals(5, skipped); + assertEquals(-1, tis.read()); + } + } + + @Test + public void testMarkResetSkipCombination() throws Exception { + byte[] data = bytes("ABCDEFGHIJKLMNOPQRSTUVWXYZ"); + try (TikaInputStream tis = TikaInputStream.get(data)) { + tis.mark(100); + tis.skip(10); + assertEquals(10, tis.getPosition()); + + byte[] buf = new byte[5]; + tis.read(buf); + assertEquals("KLMNO", str(buf)); + + tis.reset(); + assertEquals(0, tis.getPosition()); + + buf = new byte[5]; + tis.read(buf); + assertEquals("ABCDE", str(buf)); + } + } + + @Test + public void testFileBackedMarkResetSkip() throws Exception { + byte[] data = bytes("ABCDEFGHIJKLMNOPQRSTUVWXYZ"); + Path tempFile = createTempFile("ABCDEFGHIJKLMNOPQRSTUVWXYZ"); + + try (TikaInputStream tis = TikaInputStream.get(tempFile)) { + tis.skip(5); + assertEquals(5, tis.getPosition()); + + tis.mark(100); + + tis.skip(10); + assertEquals(15, tis.getPosition()); + + byte[] buf = new byte[5]; + tis.read(buf); + assertEquals("PQRST", str(buf)); + + tis.reset(); + assertEquals(5, tis.getPosition()); + + buf = new byte[5]; + tis.read(buf); + assertEquals("FGHIJ", str(buf)); + } + } + + // ========== Helper Methods ========== + + private TikaInputStream createTikaInputStream(byte[] data, boolean fileBacked) throws IOException { + if (fileBacked) { + Path file = Files.createTempFile(tempDir, "test_", ".bin"); + Files.write(file, data); + return TikaInputStream.get(file); + } else { + return TikaInputStream.get(new ByteArrayInputStream(data)); + } + } + + private byte[] readAllBytes(TikaInputStream tis) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[4096]; + int n; + while ((n = tis.read(buffer)) != -1) { + baos.write(buffer, 0, n); + } + return baos.toByteArray(); + } + + private String computeDigest(byte[] data) throws NoSuchAlgorithmException { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + byte[] digest = md.digest(data); + StringBuilder sb = new StringBuilder(); + for (byte b : digest) { + sb.append(String.format(Locale.ROOT, "%02x", b)); + } + return sb.toString(); + } + + private static byte[] bytes(String s) { + return s.getBytes(UTF_8); + } + + private static String str(byte[] b) { + return new String(b, UTF_8); + } } diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java index 3be79458fc..0edcb0a6b3 100644 --- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java +++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-crypto-module/src/main/java/org/apache/tika/parser/crypto/TSDParser.java @@ -61,7 +61,6 @@ import org.apache.tika.mime.MediaType; import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; import org.apache.tika.sax.XHTMLContentHandler; -import org.apache.tika.utils.RereadableInputStream; /** * Tika parser for Time Stamped Data Envelope (application/timestamped-data) @@ -95,21 +94,19 @@ public class TSDParser implements Parser { ParseContext context) throws IOException, SAXException, TikaException { //Try to parse TSD file - try (RereadableInputStream ris = new RereadableInputStream(tis, 2048, true)) { - Metadata TSDAndEmbeddedMetadata = new Metadata(); + Metadata TSDAndEmbeddedMetadata = new Metadata(); - List<TSDMetas> tsdMetasList = this.extractMetas(ris); - this.buildMetas(tsdMetasList, - metadata != null && metadata.size() > 0 ? TSDAndEmbeddedMetadata : metadata); + List<TSDMetas> tsdMetasList = this.extractMetas(tis); + this.buildMetas(tsdMetasList, + metadata != null && metadata.size() > 0 ? TSDAndEmbeddedMetadata : metadata); - XHTMLContentHandler xhtml = new XHTMLContentHandler(handler, metadata); - xhtml.startDocument(); - ris.rewind(); + XHTMLContentHandler xhtml = new XHTMLContentHandler(handler, metadata); + xhtml.startDocument(); + tis.rewind(); - //Try to parse embedded file in TSD file - this.parseTSDContent(ris, xhtml, TSDAndEmbeddedMetadata, context); - xhtml.endDocument(); - } + //Try to parse embedded file in TSD file + this.parseTSDContent(tis, xhtml, TSDAndEmbeddedMetadata, context); + xhtml.endDocument(); } private List<TSDMetas> extractMetas(InputStream tis) throws SAXException { diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-html-module/src/test/java/org/apache/tika/parser/html/StandardHtmlEncodingDetectorTest.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-html-module/src/test/java/org/apache/tika/parser/html/StandardHtmlEncodingDetectorTest.java index 636ffcef01..834cbbe3ef 100644 --- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-html-module/src/test/java/org/apache/tika/parser/html/StandardHtmlEncodingDetectorTest.java +++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-html-module/src/test/java/org/apache/tika/parser/html/StandardHtmlEncodingDetectorTest.java @@ -363,7 +363,7 @@ public class StandardHtmlEncodingDetectorTest { private InputStream throwAfter(String html) { byte[] contents = html.getBytes(StandardCharsets.UTF_8); - InputStream contentsInStream = TikaInputStream.get(contents); + InputStream contentsInStream = new java.io.ByteArrayInputStream(contents); InputStream errorThrowing = new InputStream() { @Override public int read() throws IOException { diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/detect/microsoft/POIFSContainerDetector.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/detect/microsoft/POIFSContainerDetector.java index d8876dfa0f..0eeb145b3c 100644 --- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/detect/microsoft/POIFSContainerDetector.java +++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/detect/microsoft/POIFSContainerDetector.java @@ -599,14 +599,11 @@ public class POIFSContainerDetector implements Detector { private Set<String> getTopLevelNames(TikaInputStream stream) throws IOException { // Force the document stream to a (possibly temporary) file // so we don't modify the current position of the stream. - //If the markLimit is < 0, this will spool the entire file - //to disk if there is not an underlying file. - Path file = stream.getPath(markLimit); + Path file = stream.getPath(); - //if the stream was longer than markLimit, don't detect if (file == null) { - LOG.warn("File length exceeds marklimit. Skipping detection on this file. " + - "If you need precise detection, consider increasing the marklimit or setting it to -1"); + // ReadOnceTikaInputStream doesn't support getPath() + LOG.warn("Stream does not support file access; skipping POIFS detection"); return Collections.emptySet(); } diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/main/java/org/apache/tika/detect/ole/MiscOLEDetector.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/main/java/org/apache/tika/detect/ole/MiscOLEDetector.java index dc297bccc7..1f86e06cd4 100644 --- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/main/java/org/apache/tika/detect/ole/MiscOLEDetector.java +++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/main/java/org/apache/tika/detect/ole/MiscOLEDetector.java @@ -137,12 +137,10 @@ public class MiscOLEDetector implements Detector { private Set<String> getTopLevelNames(TikaInputStream stream) throws IOException { // Force the document stream to a (possibly temporary) file // so we don't modify the current position of the stream. - //If the markLimit is < 0, this will spool the entire file - //to disk if there is not an underlying file. - Path file = stream.getPath(markLimit); + Path file = stream.getPath(); - //if the stream was longer than markLimit, don't detect if (file == null) { + // ReadOnceTikaInputStream doesn't support getPath() return Collections.emptySet(); } diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/test/java/org/apache/tika/parser/wordperfect/WPInputStreamTest.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/test/java/org/apache/tika/parser/wordperfect/WPInputStreamTest.java index ef55a9d1f8..f1c17750bb 100644 --- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/test/java/org/apache/tika/parser/wordperfect/WPInputStreamTest.java +++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-miscoffice-module/src/test/java/org/apache/tika/parser/wordperfect/WPInputStreamTest.java @@ -19,6 +19,7 @@ package org.apache.tika.parser.wordperfect; import static org.junit.jupiter.api.Assertions.fail; import java.io.EOFException; +import java.io.IOException; import org.junit.jupiter.api.Test; @@ -121,7 +122,7 @@ public class WPInputStreamTest { } } - private WPInputStream emptyWPStream() { + private WPInputStream emptyWPStream() throws IOException { return new WPInputStream(TikaInputStream.get(new byte[0])); } } diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-zip-commons/src/main/java/org/apache/tika/zip/utils/ZipSalvager.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-zip-commons/src/main/java/org/apache/tika/zip/utils/ZipSalvager.java index a65a851504..75c11f0387 100644 --- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-zip-commons/src/main/java/org/apache/tika/zip/utils/ZipSalvager.java +++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-zip-commons/src/main/java/org/apache/tika/zip/utils/ZipSalvager.java @@ -32,7 +32,7 @@ import org.apache.commons.io.input.CloseShieldInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.tika.utils.RereadableInputStream; +import org.apache.tika.io.TikaInputStream; public class ZipSalvager { @@ -55,14 +55,11 @@ public class ZipSalvager { public static void salvageCopy(InputStream brokenZip, File salvagedZip, boolean allowStoredEntries) throws IOException { + TikaInputStream tis = TikaInputStream.get(brokenZip); try { - if (!(brokenZip instanceof RereadableInputStream)) { - brokenZip = new RereadableInputStream(brokenZip, 50000, true); - } - try (ZipArchiveOutputStream outputStream = new ZipArchiveOutputStream(salvagedZip); ZipArchiveInputStream zipArchiveInputStream = new ZipArchiveInputStream( - CloseShieldInputStream.wrap(brokenZip), "UTF8", false, + CloseShieldInputStream.wrap(tis), "UTF8", false, allowStoredEntries)) { ZipArchiveEntry zae = zipArchiveInputStream.getNextEntry(); try { @@ -83,8 +80,8 @@ public class ZipSalvager { //now retry if (allowStoredEntries == false && e.getFeature() == UnsupportedZipFeatureException.Feature.DATA_DESCRIPTOR) { - ((RereadableInputStream) brokenZip).rewind(); - salvageCopy(brokenZip, salvagedZip, true); + tis.rewind(); + salvageCopy(tis, salvagedZip, true); } else { throw e; } @@ -92,7 +89,7 @@ public class ZipSalvager { LOG.warn("problem fixing zip", e); } } finally { - brokenZip.close(); + tis.close(); } }
