This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 1b7a190 TIKA-3285 -- allow s3 and SimpleUrlFetcher to pull byte
ranges.
1b7a190 is described below
commit 1b7a1903cf8cf9d9f70043a9b37f1ece5f113519
Author: tballison <[email protected]>
AuthorDate: Wed Jan 27 14:14:28 2021 -0500
TIKA-3285 -- allow s3 and SimpleUrlFetcher to pull byte ranges.
---
.../tika/pipes/fetcher/SimpleUrlFetcher.java | 13 ++++++
.../apache/tika/pipes/fetcher/UrlFetcherTest.java | 51 ++++++++++++++++++++++
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 49 +++++++++++++++------
3 files changed, 99 insertions(+), 14 deletions(-)
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java
index 40faadd..276d2f4 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java
@@ -18,11 +18,13 @@ package org.apache.tika.pipes.fetcher;
import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.HttpHeaders;
import org.apache.tika.metadata.Metadata;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.net.URLConnection;
import java.util.Collections;
import java.util.Set;
@@ -56,4 +58,15 @@ public class SimpleUrlFetcher extends AbstractFetcher {
}
return TikaInputStream.get(url, metadata);
}
+
+ public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata metadata)
+ throws IOException, TikaException {
+ URL url = new URL(fetchKey);
+ URLConnection connection = url.openConnection();
+ connection.setRequestProperty("Range",
"bytes="+startRange+"-"+endRange);
+ metadata.set(HttpHeaders.CONTENT_LENGTH,
Long.toString(endRange-startRange+1));
+ TikaInputStream tis = TikaInputStream.get(connection.getInputStream());
+ tis.getPath();
+ return tis;
+ }
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/UrlFetcherTest.java
b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/UrlFetcherTest.java
new file mode 100644
index 0000000..7fb96bf
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/UrlFetcherTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fetcher;
+
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.metadata.Metadata;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.zip.GZIPInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+@Ignore("requires network connectivity")
+public class UrlFetcherTest {
+
+ @Test
+ public void testRange() throws Exception {
+ String url =
+
"https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";
+ long start = 969596307;
+ long end = start + 1408 - 1;
+ Metadata metadata = new Metadata();
+
+ try (TemporaryResources tmp = new TemporaryResources()) {
+ Path tmpPath = tmp.createTempFile();
+ try (InputStream is = new SimpleUrlFetcher().fetch(url, start,
end, metadata)) {
+ Files.copy(new GZIPInputStream(is), tmpPath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ assertEquals(2461, Files.size(tmpPath));
+ }
+ }
+}
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 25026a1..217881e 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -21,18 +21,13 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
-import org.apache.commons.io.IOUtils;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TemporaryResources;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetcher.FetcherStringException;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.slf4j.Logger;
@@ -40,12 +35,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.CopyOption;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collections;
import java.util.Map;
-import java.util.Set;
+import java.util.regex.Pattern;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
@@ -57,6 +48,7 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(S3Fetcher.class);
private static final String PREFIX = "s3";
+ private static final Pattern RANGE_PATTERN =
Pattern.compile("\\A(.*?):(\\d+):(\\d+)\\Z");
private String region;
private String bucket;
private String profile;
@@ -71,20 +63,49 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable {
LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
fetchKey, bucket);
- S3Object fullObject = s3Client.getObject(new GetObjectRequest(bucket,
fetchKey));
+ S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket,
fetchKey));
if (extractUserMetadata) {
for (Map.Entry<String, String> e :
-
fullObject.getObjectMetadata().getUserMetadata().entrySet()) {
+ s3Object.getObjectMetadata().getUserMetadata().entrySet())
{
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
if (! spoolToTemp) {
return TikaInputStream.get(
- fullObject.getObjectContent());
+ s3Object.getObjectContent());
} else {
long start = System.currentTimeMillis();
TikaInputStream tis = TikaInputStream.get(
- fullObject.getObjectContent());
+ s3Object.getObjectContent());
+ tis.getPath();
+ long elapsed = System.currentTimeMillis()-start;
+ LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
+ return tis;
+ }
+ }
+
+ public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata metadata)
+ throws TikaException, IOException {
+ //TODO -- figure out how to integrate this
+ LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket
({})",
+ fetchKey, startRange, endRange, bucket);
+
+ S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket,
fetchKey)
+ .withRange(startRange, endRange));
+
+ if (extractUserMetadata) {
+ for (Map.Entry<String, String> e :
+ s3Object.getObjectMetadata().getUserMetadata().entrySet())
{
+ metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+ }
+ }
+ if (! spoolToTemp) {
+ return TikaInputStream.get(
+ s3Object.getObjectContent());
+ } else {
+ long start = System.currentTimeMillis();
+ TikaInputStream tis = TikaInputStream.get(
+ s3Object.getObjectContent());
tis.getPath();
long elapsed = System.currentTimeMillis()-start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);