This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new b068e4290 TIKA-4252: add request metadata (#1753)
b068e4290 is described below
commit b068e4290ad311b1e5f1ddaa6afa40be9e7bd797
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Fri May 10 00:51:42 2024 -0500
TIKA-4252: add request metadata (#1753)
Co-authored-by: Nicholas DiPiazza <[email protected]>
---
.../java/org/apache/tika/pipes/PipesServer.java | 24 +++++++++++-----------
.../apache/tika/pipes/fetcher/EmptyFetcher.java | 2 +-
.../org/apache/tika/pipes/fetcher/Fetcher.java | 8 +++++++-
.../apache/tika/pipes/fetcher/RangeFetcher.java | 8 ++++++--
.../tika/pipes/fetcher/fs/FileSystemFetcher.java | 8 ++++----
.../apache/tika/pipes/fetcher/url/UrlFetcher.java | 4 ++--
.../org/apache/tika/pipes/async/MockFetcher.java | 2 +-
.../org/apache/tika/pipes/fetcher/MockFetcher.java | 2 +-
.../tika/pipes/fetcher/azblob/AZBlobFetcher.java | 6 +++---
.../apache/tika/pipes/fetcher/gcs/GCSFetcher.java | 6 +++---
.../tika/pipes/fetcher/http/HttpFetcher.java | 16 +++++++--------
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 8 ++++----
12 files changed, 52 insertions(+), 42 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 20a5def59..1562a8d83 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -455,33 +455,33 @@ public class PipesServer implements Runnable {
}
}
- protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t,
Fetcher fetcher) {
- FetchKey fetchKey = t.getFetchKey();
+ protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple
fetchEmitTuple, Fetcher fetcher) {
+ FetchKey fetchKey = fetchEmitTuple.getFetchKey();
+ Metadata fetchResponseMetadata = new Metadata();
+ Metadata fetchRequestMetadata = fetchEmitTuple.getMetadata();
if (fetchKey.hasRange()) {
if (!(fetcher instanceof RangeFetcher)) {
throw new IllegalArgumentException(
"fetch key has a range, but the fetcher is not a range
fetcher");
}
- Metadata metadata = t.getMetadata() == null ? new Metadata() :
t.getMetadata();
try (InputStream stream = ((RangeFetcher)
fetcher).fetch(fetchKey.getFetchKey(),
- fetchKey.getRangeStart(), fetchKey.getRangeEnd(),
metadata)) {
- return parseWithStream(t, stream, metadata);
+ fetchKey.getRangeStart(), fetchKey.getRangeEnd(),
fetchRequestMetadata, fetchResponseMetadata)) {
+ return parseWithStream(fetchEmitTuple, stream,
fetchResponseMetadata);
} catch (SecurityException e) {
- LOG.error("security exception " + t.getId(), e);
+ LOG.error("security exception " + fetchEmitTuple.getId(), e);
throw e;
} catch (TikaException | IOException e) {
- LOG.warn("fetch exception " + t.getId(), e);
+ LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
} else {
- Metadata metadata = t.getMetadata() == null ? new Metadata() :
t.getMetadata();
- try (InputStream stream =
fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
- return parseWithStream(t, stream, metadata);
+ try (InputStream stream =
fetcher.fetch(fetchEmitTuple.getFetchKey().getFetchKey(), fetchRequestMetadata,
fetchResponseMetadata)) {
+ return parseWithStream(fetchEmitTuple, stream,
fetchResponseMetadata);
} catch (SecurityException e) {
- LOG.error("security exception " + t.getId(), e);
+ LOG.error("security exception " + fetchEmitTuple.getId(), e);
throw e;
} catch (TikaException | IOException e) {
- LOG.warn("fetch exception " + t.getId(), e);
+ LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
index 022d00a8c..925839694 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
@@ -30,7 +30,7 @@ public class EmptyFetcher implements Fetcher {
}
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws TikaException, IOException {
return null;
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
index 1b3fa2a24..4bdb69cd8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
@@ -33,5 +33,11 @@ public interface Fetcher {
String getName();
- InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException;
+ default InputStream fetch(String fetchKey, Metadata fetchResponseMetadata)
+ throws TikaException, IOException {
+ return fetch(fetchKey, new Metadata(), fetchResponseMetadata);
+ }
+
+ InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata
fetchResponseMetadata)
+ throws TikaException, IOException;
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
index 0a3ceae7f..ed9227eeb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
@@ -28,7 +28,11 @@ import org.apache.tika.metadata.Metadata;
public interface RangeFetcher extends Fetcher {
//At some point, Tika 3.x?, we may want to add optional ranges to the
fetchKey?
- InputStream fetch(String fetchKey, long startOffset, long endOffset,
Metadata metadata)
- throws TikaException, IOException;
+ default InputStream fetch(String fetchKey, long startOffset, long
endOffset, Metadata fetchResponseMetadata)
+ throws TikaException, IOException {
+ return fetch(fetchKey, startOffset, endOffset, new Metadata(),
fetchResponseMetadata);
+ }
+ InputStream fetch(String fetchKey, long startOffset, long endOffset,
Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
+ throws TikaException, IOException;
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index d926e3ca6..31a8b2c81 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -58,7 +58,7 @@ public class FileSystemFetcher extends AbstractFetcher
implements Initializable
}
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
IOException, TikaException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws IOException, TikaException {
if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("Path must not contain \u0000.
" +
@@ -76,8 +76,8 @@ public class FileSystemFetcher extends AbstractFetcher
implements Initializable
p = Paths.get(fetchKey);
}
- metadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
- updateFileSystemMetadata(p, metadata);
+ fetchRequestMetadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
+ updateFileSystemMetadata(p, fetchRequestMetadata);
if (!Files.isRegularFile(p)) {
if (basePath != null && !Files.isDirectory(basePath)) {
@@ -87,7 +87,7 @@ public class FileSystemFetcher extends AbstractFetcher
implements Initializable
}
}
- return TikaInputStream.get(p, metadata);
+ return TikaInputStream.get(p, fetchRequestMetadata);
}
private void updateFileSystemMetadata(Path p, Metadata metadata) throws
IOException {
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
index f415a3560..8c571b258 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
@@ -35,7 +35,7 @@ import org.apache.tika.pipes.fetcher.AbstractFetcher;
public class UrlFetcher extends AbstractFetcher {
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
IOException, TikaException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws IOException, TikaException {
if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("URL must not contain \u0000. "
+
"Please review the life decisions that led you to
requesting " +
@@ -46,7 +46,7 @@ public class UrlFetcher extends AbstractFetcher {
"The UrlFetcher does not fetch from file shares; " +
"please use the FileSystemFetcher");
}
- return TikaInputStream.get(new URL(fetchKey), metadata);
+ return TikaInputStream.get(new URL(fetchKey), fetchRequestMetadata);
}
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
index 10af275e3..1a731f74a 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -37,7 +37,7 @@ public class MockFetcher implements Fetcher {
}
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws TikaException, IOException {
return new ByteArrayInputStream(BYTES);
}
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
index 060432724..df23708eb 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
@@ -64,7 +64,7 @@ public class MockFetcher extends AbstractFetcher implements
Initializable {
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws TikaException, IOException {
return byteString == null ? new ByteArrayInputStream(new byte[0]) :
new
ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8));
}
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
index dee903040..c25b85552 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
@@ -70,7 +70,7 @@ public class AZBlobFetcher extends AbstractFetcher implements
Initializable {
private boolean spoolToTemp = true;
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws TikaException, IOException {
LOGGER.debug("about to fetch fetchkey={} from endpoint ({})",
fetchKey, endpoint);
@@ -81,7 +81,7 @@ public class AZBlobFetcher extends AbstractFetcher implements
Initializable {
BlobProperties properties = blobClient.getProperties();
if (properties.getMetadata() != null) {
for (Map.Entry<String, String> e :
properties.getMetadata().entrySet()) {
- metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ fetchRequestMetadata.add(PREFIX + ":" + e.getKey(),
e.getValue());
}
}
}
@@ -94,7 +94,7 @@ public class AZBlobFetcher extends AbstractFetcher implements
Initializable {
try (OutputStream os = Files.newOutputStream(tmp)) {
blobClient.download(os);
}
- TikaInputStream tis = TikaInputStream.get(tmp, metadata,
tmpResources);
+ TikaInputStream tis = TikaInputStream.get(tmp,
fetchRequestMetadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
index 6881c5a66..868c1b8fa 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
@@ -55,7 +55,7 @@ public class GCSFetcher extends AbstractFetcher implements
Initializable {
private boolean spoolToTemp = true;
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws TikaException, IOException {
LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey,
bucket);
@@ -65,7 +65,7 @@ public class GCSFetcher extends AbstractFetcher implements
Initializable {
if (extractUserMetadata) {
if (blob.getMetadata() != null) {
for (Map.Entry<String, String> e :
blob.getMetadata().entrySet()) {
- metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ fetchRequestMetadata.add(PREFIX + ":" + e.getKey(),
e.getValue());
}
}
}
@@ -76,7 +76,7 @@ public class GCSFetcher extends AbstractFetcher implements
Initializable {
TemporaryResources tmpResources = new TemporaryResources();
Path tmp = tmpResources.createTempFile();
blob.downloadTo(tmp);
- TikaInputStream tis = TikaInputStream.get(tmp, metadata,
tmpResources);
+ TikaInputStream tis = TikaInputStream.get(tmp,
fetchRequestMetadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
index 26b45f8bf..6d4ddab28 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
@@ -136,7 +136,7 @@ public class HttpFetcher extends AbstractFetcher implements
Initializable, Range
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
IOException, TikaException {
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws IOException, TikaException {
HttpGet get = new HttpGet(fetchKey);
RequestConfig requestConfig =
RequestConfig.custom()
@@ -146,21 +146,21 @@ public class HttpFetcher extends AbstractFetcher
implements Initializable, Range
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
- return execute(get, metadata, httpClient, true);
+ return execute(get, fetchResponseMetadata, httpClient, true);
}
@Override
- public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata metadata)
+ public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws IOException {
HttpGet get = new HttpGet(fetchKey);
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
- return execute(get, metadata, httpClient, true);
+ return execute(get, fetchResponseMetadata, httpClient, true);
}
- private InputStream execute(HttpGet get, Metadata metadata, HttpClient
client,
+ private InputStream execute(HttpGet get, Metadata fetchRequestMetadata,
HttpClient client,
boolean retryOnBadLength) throws IOException {
HttpClientContext context = HttpClientContext.create();
HttpResponse response = null;
@@ -183,7 +183,7 @@ public class HttpFetcher extends AbstractFetcher implements
Initializable, Range
}
response = client.execute(get, context);
- updateMetadata(get.getURI().toString(), response, context,
metadata);
+ updateMetadata(get.getURI().toString(), response, context,
fetchRequestMetadata);
int code = response.getStatusLine().getStatusCode();
if (code < 200 || code > 299) {
@@ -191,7 +191,7 @@ public class HttpFetcher extends AbstractFetcher implements
Initializable, Range
responseToString(response));
}
try (InputStream is = response.getEntity().getContent()) {
- return spool(is, metadata);
+ return spool(is, fetchRequestMetadata);
}
} catch (ConnectionClosedException e) {
@@ -202,7 +202,7 @@ public class HttpFetcher extends AbstractFetcher implements
Initializable, Range
//and then compresses the stream. See HTTPCLIENT-2176
LOG.warn("premature end of content-length delimited message;
retrying with " +
"content compression disabled for {}", get.getURI());
- return execute(get, metadata, noCompressHttpClient, false);
+ return execute(get, fetchRequestMetadata,
noCompressHttpClient, false);
}
throw e;
} catch (IOException e) {
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index b57c361b9..38161a1b3 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -106,12 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
private boolean pathStyleAccessEnabled = false;
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
- return fetch(fetchKey, -1, -1, metadata);
+ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata,
Metadata fetchResponseMetadata) throws TikaException, IOException {
+ return fetch(fetchKey, -1, -1, fetchRequestMetadata);
}
@Override
- public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata metadata)
+ public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException {
String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix +
fetchKey;
@@ -129,7 +129,7 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
do {
try {
long start = System.currentTimeMillis();
- InputStream is = _fetch(theFetchKey, metadata, startRange,
endRange);
+ InputStream is = _fetch(theFetchKey, fetchResponseMetadata,
startRange, endRange);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("total to fetch {}", elapsed);
return is;