This is an automated email from the ASF dual-hosted git repository. ndipiazza pushed a commit to branch TIKA-4252-fetch-tuple-missing-metadata-2 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 5f4ce3d72e3a188ac4f56ee6f78425e5ca822cd4 Author: Nicholas DiPiazza <[email protected]> AuthorDate: Fri May 10 00:34:50 2024 -0500 TIKA-4252: add request metadata --- .../java/org/apache/tika/pipes/PipesServer.java | 24 +++++++++++----------- .../apache/tika/pipes/fetcher/EmptyFetcher.java | 2 +- .../org/apache/tika/pipes/fetcher/Fetcher.java | 8 +++++++- .../apache/tika/pipes/fetcher/RangeFetcher.java | 8 ++++++-- .../tika/pipes/fetcher/fs/FileSystemFetcher.java | 8 ++++---- .../apache/tika/pipes/fetcher/url/UrlFetcher.java | 4 ++-- .../org/apache/tika/pipes/async/MockFetcher.java | 2 +- .../org/apache/tika/pipes/fetcher/MockFetcher.java | 2 +- .../tika/pipes/fetcher/azblob/AZBlobFetcher.java | 6 +++--- .../apache/tika/pipes/fetcher/gcs/GCSFetcher.java | 6 +++--- .../tika/pipes/fetcher/http/HttpFetcher.java | 16 +++++++-------- .../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 8 ++++---- 12 files changed, 52 insertions(+), 42 deletions(-) diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java index 20a5def59..1562a8d83 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java @@ -455,33 +455,33 @@ public class PipesServer implements Runnable { } } - protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) { - FetchKey fetchKey = t.getFetchKey(); + protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple fetchEmitTuple, Fetcher fetcher) { + FetchKey fetchKey = fetchEmitTuple.getFetchKey(); + Metadata fetchResponseMetadata = new Metadata(); + Metadata fetchRequestMetadata = fetchEmitTuple.getMetadata(); if (fetchKey.hasRange()) { if (!(fetcher instanceof RangeFetcher)) { throw new IllegalArgumentException( "fetch key has a range, but the fetcher is not a range fetcher"); } - Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata(); try (InputStream stream = ((RangeFetcher) fetcher).fetch(fetchKey.getFetchKey(), - fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) { - return parseWithStream(t, stream, metadata); + fetchKey.getRangeStart(), fetchKey.getRangeEnd(), fetchRequestMetadata, fetchResponseMetadata)) { + return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata); } catch (SecurityException e) { - LOG.error("security exception " + t.getId(), e); + LOG.error("security exception " + fetchEmitTuple.getId(), e); throw e; } catch (TikaException | IOException e) { - LOG.warn("fetch exception " + t.getId(), e); + LOG.warn("fetch exception " + fetchEmitTuple.getId(), e); write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e)); } } else { - Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata(); - try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) { - return parseWithStream(t, stream, metadata); + try (InputStream stream = fetcher.fetch(fetchEmitTuple.getFetchKey().getFetchKey(), fetchRequestMetadata, fetchResponseMetadata)) { + return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata); } catch (SecurityException e) { - LOG.error("security exception " + t.getId(), e); + LOG.error("security exception " + fetchEmitTuple.getId(), e); throw e; } catch (TikaException | IOException e) { - LOG.warn("fetch exception " + t.getId(), e); + LOG.warn("fetch exception " + fetchEmitTuple.getId(), e); write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e)); } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java index 022d00a8c..925839694 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java @@ -30,7 +30,7 @@ public class EmptyFetcher implements Fetcher { } @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException { return null; } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java index 1b3fa2a24..4bdb69cd8 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java @@ -33,5 +33,11 @@ public interface Fetcher { String getName(); - InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException; + default InputStream fetch(String fetchKey, Metadata fetchResponseMetadata) + throws TikaException, IOException { + return fetch(fetchKey, new Metadata(), fetchResponseMetadata); + } + + InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) + throws TikaException, IOException; } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java index 0a3ceae7f..ed9227eeb 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java @@ -28,7 +28,11 @@ import org.apache.tika.metadata.Metadata; public interface RangeFetcher extends Fetcher { //At some point, Tika 3.x?, we may want to add optional ranges to the fetchKey? - InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata metadata) - throws TikaException, IOException; + default InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchResponseMetadata) + throws TikaException, IOException { + return fetch(fetchKey, startOffset, endOffset, new Metadata(), fetchResponseMetadata); + } + InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) + throws TikaException, IOException; } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java index d926e3ca6..31a8b2c81 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java @@ -58,7 +58,7 @@ public class FileSystemFetcher extends AbstractFetcher implements Initializable } @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException { if (fetchKey.contains("\u0000")) { throw new IllegalArgumentException("Path must not contain \u0000. " + @@ -76,8 +76,8 @@ public class FileSystemFetcher extends AbstractFetcher implements Initializable p = Paths.get(fetchKey); } - metadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey); - updateFileSystemMetadata(p, metadata); + fetchRequestMetadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey); + updateFileSystemMetadata(p, fetchRequestMetadata); if (!Files.isRegularFile(p)) { if (basePath != null && !Files.isDirectory(basePath)) { @@ -87,7 +87,7 @@ public class FileSystemFetcher extends AbstractFetcher implements Initializable } } - return TikaInputStream.get(p, metadata); + return TikaInputStream.get(p, fetchRequestMetadata); } private void updateFileSystemMetadata(Path p, Metadata metadata) throws IOException { diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java index f415a3560..8c571b258 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java @@ -35,7 +35,7 @@ import org.apache.tika.pipes.fetcher.AbstractFetcher; public class UrlFetcher extends AbstractFetcher { @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException { if (fetchKey.contains("\u0000")) { throw new IllegalArgumentException("URL must not contain \u0000. " + "Please review the life decisions that led you to requesting " + @@ -46,7 +46,7 @@ public class UrlFetcher extends AbstractFetcher { "The UrlFetcher does not fetch from file shares; " + "please use the FileSystemFetcher"); } - return TikaInputStream.get(new URL(fetchKey), metadata); + return TikaInputStream.get(new URL(fetchKey), fetchRequestMetadata); } } diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java index 10af275e3..1a731f74a 100644 --- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java +++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java @@ -37,7 +37,7 @@ public class MockFetcher implements Fetcher { } @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException { return new ByteArrayInputStream(BYTES); } } diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java index 060432724..df23708eb 100644 --- a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java +++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java @@ -64,7 +64,7 @@ public class MockFetcher extends AbstractFetcher implements Initializable { @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException { return byteString == null ? new ByteArrayInputStream(new byte[0]) : new ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8)); } diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java index dee903040..c25b85552 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java +++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java @@ -70,7 +70,7 @@ public class AZBlobFetcher extends AbstractFetcher implements Initializable { private boolean spoolToTemp = true; @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException { LOGGER.debug("about to fetch fetchkey={} from endpoint ({})", fetchKey, endpoint); @@ -81,7 +81,7 @@ public class AZBlobFetcher extends AbstractFetcher implements Initializable { BlobProperties properties = blobClient.getProperties(); if (properties.getMetadata() != null) { for (Map.Entry<String, String> e : properties.getMetadata().entrySet()) { - metadata.add(PREFIX + ":" + e.getKey(), e.getValue()); + fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue()); } } } @@ -94,7 +94,7 @@ public class AZBlobFetcher extends AbstractFetcher implements Initializable { try (OutputStream os = Files.newOutputStream(tmp)) { blobClient.download(os); } - TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources); + TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources); long elapsed = System.currentTimeMillis() - start; LOGGER.debug("took {} ms to copy to local tmp file", elapsed); return tis; diff --git a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java index 6881c5a66..868c1b8fa 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java +++ b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java @@ -55,7 +55,7 @@ public class GCSFetcher extends AbstractFetcher implements Initializable { private boolean spoolToTemp = true; @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException { LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket); @@ -65,7 +65,7 @@ public class GCSFetcher extends AbstractFetcher implements Initializable { if (extractUserMetadata) { if (blob.getMetadata() != null) { for (Map.Entry<String, String> e : blob.getMetadata().entrySet()) { - metadata.add(PREFIX + ":" + e.getKey(), e.getValue()); + fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue()); } } } @@ -76,7 +76,7 @@ public class GCSFetcher extends AbstractFetcher implements Initializable { TemporaryResources tmpResources = new TemporaryResources(); Path tmp = tmpResources.createTempFile(); blob.downloadTo(tmp); - TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources); + TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources); long elapsed = System.currentTimeMillis() - start; LOGGER.debug("took {} ms to copy to local tmp file", elapsed); return tis; diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java index 26b45f8bf..6d4ddab28 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java +++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java @@ -136,7 +136,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException { + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException { HttpGet get = new HttpGet(fetchKey); RequestConfig requestConfig = RequestConfig.custom() @@ -146,21 +146,21 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range if (! StringUtils.isBlank(userAgent)) { get.setHeader(USER_AGENT, userAgent); } - return execute(get, metadata, httpClient, true); + return execute(get, fetchResponseMetadata, httpClient, true); } @Override - public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata) + public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException { HttpGet get = new HttpGet(fetchKey); if (! StringUtils.isBlank(userAgent)) { get.setHeader(USER_AGENT, userAgent); } get.setHeader("Range", "bytes=" + startRange + "-" + endRange); - return execute(get, metadata, httpClient, true); + return execute(get, fetchResponseMetadata, httpClient, true); } - private InputStream execute(HttpGet get, Metadata metadata, HttpClient client, + private InputStream execute(HttpGet get, Metadata fetchRequestMetadata, HttpClient client, boolean retryOnBadLength) throws IOException { HttpClientContext context = HttpClientContext.create(); HttpResponse response = null; @@ -183,7 +183,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range } response = client.execute(get, context); - updateMetadata(get.getURI().toString(), response, context, metadata); + updateMetadata(get.getURI().toString(), response, context, fetchRequestMetadata); int code = response.getStatusLine().getStatusCode(); if (code < 200 || code > 299) { @@ -191,7 +191,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range responseToString(response)); } try (InputStream is = response.getEntity().getContent()) { - return spool(is, metadata); + return spool(is, fetchRequestMetadata); } } catch (ConnectionClosedException e) { @@ -202,7 +202,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range //and then compresses the stream. See HTTPCLIENT-2176 LOG.warn("premature end of content-length delimited message; retrying with " + "content compression disabled for {}", get.getURI()); - return execute(get, metadata, noCompressHttpClient, false); + return execute(get, fetchRequestMetadata, noCompressHttpClient, false); } throw e; } catch (IOException e) { diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java index b57c361b9..38161a1b3 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java +++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java @@ -106,12 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe private boolean pathStyleAccessEnabled = false; @Override - public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException { - return fetch(fetchKey, -1, -1, metadata); + public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException { + return fetch(fetchKey, -1, -1, fetchRequestMetadata); } @Override - public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata) + public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException { String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey; @@ -129,7 +129,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe do { try { long start = System.currentTimeMillis(); - InputStream is = _fetch(theFetchKey, metadata, startRange, endRange); + InputStream is = _fetch(theFetchKey, fetchResponseMetadata, startRange, endRange); long elapsed = System.currentTimeMillis() - start; LOGGER.debug("total to fetch {}", elapsed); return is;
