This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4252-fetch-tuple-missing-metadata-2
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 5f4ce3d72e3a188ac4f56ee6f78425e5ca822cd4
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Fri May 10 00:34:50 2024 -0500

    TIKA-4252: add request metadata
---
 .../java/org/apache/tika/pipes/PipesServer.java    | 24 +++++++++++-----------
 .../apache/tika/pipes/fetcher/EmptyFetcher.java    |  2 +-
 .../org/apache/tika/pipes/fetcher/Fetcher.java     |  8 +++++++-
 .../apache/tika/pipes/fetcher/RangeFetcher.java    |  8 ++++++--
 .../tika/pipes/fetcher/fs/FileSystemFetcher.java   |  8 ++++----
 .../apache/tika/pipes/fetcher/url/UrlFetcher.java  |  4 ++--
 .../org/apache/tika/pipes/async/MockFetcher.java   |  2 +-
 .../org/apache/tika/pipes/fetcher/MockFetcher.java |  2 +-
 .../tika/pipes/fetcher/azblob/AZBlobFetcher.java   |  6 +++---
 .../apache/tika/pipes/fetcher/gcs/GCSFetcher.java  |  6 +++---
 .../tika/pipes/fetcher/http/HttpFetcher.java       | 16 +++++++--------
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    |  8 ++++----
 12 files changed, 52 insertions(+), 42 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 20a5def59..1562a8d83 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -455,33 +455,33 @@ public class PipesServer implements Runnable {
         }
     }
 
-    protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, 
Fetcher fetcher) {
-        FetchKey fetchKey = t.getFetchKey();
+    protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple 
fetchEmitTuple, Fetcher fetcher) {
+        FetchKey fetchKey = fetchEmitTuple.getFetchKey();
+        Metadata fetchResponseMetadata = new Metadata();
+        Metadata fetchRequestMetadata = fetchEmitTuple.getMetadata();
         if (fetchKey.hasRange()) {
             if (!(fetcher instanceof RangeFetcher)) {
                 throw new IllegalArgumentException(
                         "fetch key has a range, but the fetcher is not a range 
fetcher");
             }
-            Metadata metadata = t.getMetadata() == null ? new Metadata() : 
t.getMetadata();
             try (InputStream stream = ((RangeFetcher) 
fetcher).fetch(fetchKey.getFetchKey(),
-                    fetchKey.getRangeStart(), fetchKey.getRangeEnd(), 
metadata)) {
-                return parseWithStream(t, stream, metadata);
+                    fetchKey.getRangeStart(), fetchKey.getRangeEnd(), 
fetchRequestMetadata, fetchResponseMetadata)) {
+                return parseWithStream(fetchEmitTuple, stream, 
fetchResponseMetadata);
             } catch (SecurityException e) {
-                LOG.error("security exception " + t.getId(), e);
+                LOG.error("security exception " + fetchEmitTuple.getId(), e);
                 throw e;
             } catch (TikaException | IOException e) {
-                LOG.warn("fetch exception " + t.getId(), e);
+                LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
                 write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
             }
         } else {
-            Metadata metadata = t.getMetadata() == null ? new Metadata() : 
t.getMetadata();
-            try (InputStream stream = 
fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
-                return parseWithStream(t, stream, metadata);
+            try (InputStream stream = 
fetcher.fetch(fetchEmitTuple.getFetchKey().getFetchKey(), fetchRequestMetadata, 
fetchResponseMetadata)) {
+                return parseWithStream(fetchEmitTuple, stream, 
fetchResponseMetadata);
             } catch (SecurityException e) {
-                LOG.error("security exception " + t.getId(), e);
+                LOG.error("security exception " + fetchEmitTuple.getId(), e);
                 throw e;
             } catch (TikaException | IOException e) {
-                LOG.warn("fetch exception " + t.getId(), e);
+                LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
                 write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
             }
         }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
index 022d00a8c..925839694 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
@@ -30,7 +30,7 @@ public class EmptyFetcher implements Fetcher {
     }
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws TikaException, IOException {
         return null;
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
index 1b3fa2a24..4bdb69cd8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
@@ -33,5 +33,11 @@ public interface Fetcher {
 
     String getName();
 
-    InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException;
+    default InputStream fetch(String fetchKey, Metadata fetchResponseMetadata)
+            throws TikaException, IOException {
+        return fetch(fetchKey, new Metadata(), fetchResponseMetadata);
+    }
+
+    InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata 
fetchResponseMetadata)
+            throws TikaException, IOException;
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
index 0a3ceae7f..ed9227eeb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
@@ -28,7 +28,11 @@ import org.apache.tika.metadata.Metadata;
 public interface RangeFetcher extends Fetcher {
     //At some point, Tika 3.x?, we may want to add optional ranges to the 
fetchKey?
 
-    InputStream fetch(String fetchKey, long startOffset, long endOffset, 
Metadata metadata)
-            throws TikaException, IOException;
+    default InputStream fetch(String fetchKey, long startOffset, long 
endOffset, Metadata fetchResponseMetadata)
+            throws TikaException, IOException {
+        return fetch(fetchKey, startOffset, endOffset, new Metadata(), 
fetchResponseMetadata);
+    }
 
+    InputStream fetch(String fetchKey, long startOffset, long endOffset, 
Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
+            throws TikaException, IOException;
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index d926e3ca6..31a8b2c81 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -58,7 +58,7 @@ public class FileSystemFetcher extends AbstractFetcher 
implements Initializable
     }
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
IOException, TikaException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws IOException, TikaException {
 
         if (fetchKey.contains("\u0000")) {
             throw new IllegalArgumentException("Path must not contain \u0000. 
" +
@@ -76,8 +76,8 @@ public class FileSystemFetcher extends AbstractFetcher 
implements Initializable
             p = Paths.get(fetchKey);
         }
 
-        metadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
-        updateFileSystemMetadata(p, metadata);
+        fetchRequestMetadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
+        updateFileSystemMetadata(p, fetchRequestMetadata);
 
         if (!Files.isRegularFile(p)) {
             if (basePath != null && !Files.isDirectory(basePath)) {
@@ -87,7 +87,7 @@ public class FileSystemFetcher extends AbstractFetcher 
implements Initializable
             }
         }
 
-        return TikaInputStream.get(p, metadata);
+        return TikaInputStream.get(p, fetchRequestMetadata);
     }
 
     private void updateFileSystemMetadata(Path p, Metadata metadata) throws 
IOException {
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
index f415a3560..8c571b258 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
@@ -35,7 +35,7 @@ import org.apache.tika.pipes.fetcher.AbstractFetcher;
 public class UrlFetcher extends AbstractFetcher {
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
IOException, TikaException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws IOException, TikaException {
         if (fetchKey.contains("\u0000")) {
             throw new IllegalArgumentException("URL must not contain \u0000. " 
+
                     "Please review the life decisions that led you to 
requesting " +
@@ -46,7 +46,7 @@ public class UrlFetcher extends AbstractFetcher {
                     "The UrlFetcher does not fetch from file shares; " +
                     "please use the FileSystemFetcher");
         }
-        return TikaInputStream.get(new URL(fetchKey), metadata);
+        return TikaInputStream.get(new URL(fetchKey), fetchRequestMetadata);
     }
 
 }
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java 
b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
index 10af275e3..1a731f74a 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -37,7 +37,7 @@ public class MockFetcher implements Fetcher {
     }
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws TikaException, IOException {
         return new ByteArrayInputStream(BYTES);
     }
 }
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java 
b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
index 060432724..df23708eb 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
@@ -64,7 +64,7 @@ public class MockFetcher extends AbstractFetcher implements 
Initializable {
 
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws TikaException, IOException {
         return byteString == null ? new ByteArrayInputStream(new byte[0]) :
                 new 
ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8));
     }
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
index dee903040..c25b85552 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
@@ -70,7 +70,7 @@ public class AZBlobFetcher extends AbstractFetcher implements 
Initializable {
     private boolean spoolToTemp = true;
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws TikaException, IOException {
 
         LOGGER.debug("about to fetch fetchkey={} from endpoint ({})", 
fetchKey, endpoint);
 
@@ -81,7 +81,7 @@ public class AZBlobFetcher extends AbstractFetcher implements 
Initializable {
                 BlobProperties properties = blobClient.getProperties();
                 if (properties.getMetadata() != null) {
                     for (Map.Entry<String, String> e : 
properties.getMetadata().entrySet()) {
-                        metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+                        fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), 
e.getValue());
                     }
                 }
             }
@@ -94,7 +94,7 @@ public class AZBlobFetcher extends AbstractFetcher implements 
Initializable {
                 try (OutputStream os = Files.newOutputStream(tmp)) {
                     blobClient.download(os);
                 }
-                TikaInputStream tis = TikaInputStream.get(tmp, metadata, 
tmpResources);
+                TikaInputStream tis = TikaInputStream.get(tmp, 
fetchRequestMetadata, tmpResources);
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
                 return tis;
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
index 6881c5a66..868c1b8fa 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
@@ -55,7 +55,7 @@ public class GCSFetcher extends AbstractFetcher implements 
Initializable {
     private boolean spoolToTemp = true;
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws TikaException, IOException {
 
         LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, 
bucket);
 
@@ -65,7 +65,7 @@ public class GCSFetcher extends AbstractFetcher implements 
Initializable {
             if (extractUserMetadata) {
                 if (blob.getMetadata() != null) {
                     for (Map.Entry<String, String> e : 
blob.getMetadata().entrySet()) {
-                        metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+                        fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), 
e.getValue());
                     }
                 }
             }
@@ -76,7 +76,7 @@ public class GCSFetcher extends AbstractFetcher implements 
Initializable {
                 TemporaryResources tmpResources = new TemporaryResources();
                 Path tmp = tmpResources.createTempFile();
                 blob.downloadTo(tmp);
-                TikaInputStream tis = TikaInputStream.get(tmp, metadata, 
tmpResources);
+                TikaInputStream tis = TikaInputStream.get(tmp, 
fetchRequestMetadata, tmpResources);
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
                 return tis;
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
index 26b45f8bf..6d4ddab28 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
@@ -136,7 +136,7 @@ public class HttpFetcher extends AbstractFetcher implements 
Initializable, Range
 
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
IOException, TikaException {
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws IOException, TikaException {
         HttpGet get = new HttpGet(fetchKey);
         RequestConfig requestConfig =
                 RequestConfig.custom()
@@ -146,21 +146,21 @@ public class HttpFetcher extends AbstractFetcher 
implements Initializable, Range
         if (! StringUtils.isBlank(userAgent)) {
             get.setHeader(USER_AGENT, userAgent);
         }
-        return execute(get, metadata, httpClient, true);
+        return execute(get, fetchResponseMetadata, httpClient, true);
     }
 
     @Override
-    public InputStream fetch(String fetchKey, long startRange, long endRange, 
Metadata metadata)
+    public InputStream fetch(String fetchKey, long startRange, long endRange, 
Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
             throws IOException {
         HttpGet get = new HttpGet(fetchKey);
         if (! StringUtils.isBlank(userAgent)) {
             get.setHeader(USER_AGENT, userAgent);
         }
         get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
-        return execute(get, metadata, httpClient, true);
+        return execute(get, fetchResponseMetadata, httpClient, true);
     }
 
-    private InputStream execute(HttpGet get, Metadata metadata, HttpClient 
client,
+    private InputStream execute(HttpGet get, Metadata fetchRequestMetadata, 
HttpClient client,
                                 boolean retryOnBadLength) throws IOException {
         HttpClientContext context = HttpClientContext.create();
         HttpResponse response = null;
@@ -183,7 +183,7 @@ public class HttpFetcher extends AbstractFetcher implements 
Initializable, Range
             }
             response = client.execute(get, context);
 
-            updateMetadata(get.getURI().toString(), response, context, 
metadata);
+            updateMetadata(get.getURI().toString(), response, context, 
fetchRequestMetadata);
 
             int code = response.getStatusLine().getStatusCode();
             if (code < 200 || code > 299) {
@@ -191,7 +191,7 @@ public class HttpFetcher extends AbstractFetcher implements 
Initializable, Range
                         responseToString(response));
             }
             try (InputStream is = response.getEntity().getContent()) {
-                return spool(is, metadata);
+                return spool(is, fetchRequestMetadata);
             }
         } catch (ConnectionClosedException e) {
 
@@ -202,7 +202,7 @@ public class HttpFetcher extends AbstractFetcher implements 
Initializable, Range
                 //and then compresses the stream. See HTTPCLIENT-2176
                 LOG.warn("premature end of content-length delimited message; 
retrying with " +
                         "content compression disabled for {}", get.getURI());
-                return execute(get, metadata, noCompressHttpClient, false);
+                return execute(get, fetchRequestMetadata, 
noCompressHttpClient, false);
             }
             throw e;
         } catch  (IOException e) {
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index b57c361b9..38161a1b3 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -106,12 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable, RangeFe
     private boolean pathStyleAccessEnabled = false;
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException {
-        return fetch(fetchKey, -1, -1, metadata);
+    public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, 
Metadata fetchResponseMetadata) throws TikaException, IOException {
+        return fetch(fetchKey, -1, -1, fetchRequestMetadata);
     }
 
     @Override
-    public InputStream fetch(String fetchKey, long startRange, long endRange, 
Metadata metadata)
+    public InputStream fetch(String fetchKey, long startRange, long endRange, 
Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
             throws TikaException, IOException {
         String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + 
fetchKey;
 
@@ -129,7 +129,7 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable, RangeFe
         do {
             try {
                 long start = System.currentTimeMillis();
-                InputStream is = _fetch(theFetchKey, metadata, startRange, 
endRange);
+                InputStream is = _fetch(theFetchKey, fetchResponseMetadata, 
startRange, endRange);
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("total to fetch {}", elapsed);
                 return is;

Reply via email to