This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b7a190  TIKA-3285 -- allow s3 and SimpleUrlFetcher to pull byte 
ranges.
1b7a190 is described below

commit 1b7a1903cf8cf9d9f70043a9b37f1ece5f113519
Author: tballison <[email protected]>
AuthorDate: Wed Jan 27 14:14:28 2021 -0500

    TIKA-3285 -- allow s3 and SimpleUrlFetcher to pull byte ranges.
---
 .../tika/pipes/fetcher/SimpleUrlFetcher.java       | 13 ++++++
 .../apache/tika/pipes/fetcher/UrlFetcherTest.java  | 51 ++++++++++++++++++++++
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    | 49 +++++++++++++++------
 3 files changed, 99 insertions(+), 14 deletions(-)

diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java
index 40faadd..276d2f4 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/SimpleUrlFetcher.java
@@ -18,11 +18,13 @@ package org.apache.tika.pipes.fetcher;
 
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.HttpHeaders;
 import org.apache.tika.metadata.Metadata;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.net.URLConnection;
 import java.util.Collections;
 import java.util.Set;
 
@@ -56,4 +58,15 @@ public class SimpleUrlFetcher extends AbstractFetcher {
         }
         return TikaInputStream.get(url, metadata);
     }
+
+    public InputStream fetch(String fetchKey, long startRange, long endRange, 
Metadata metadata)
+            throws IOException, TikaException {
+        URL url = new URL(fetchKey);
+        URLConnection connection = url.openConnection();
+        connection.setRequestProperty("Range", 
"bytes="+startRange+"-"+endRange);
+        metadata.set(HttpHeaders.CONTENT_LENGTH, 
Long.toString(endRange-startRange+1));
+        TikaInputStream tis = TikaInputStream.get(connection.getInputStream());
+        tis.getPath();
+        return tis;
+    }
 }
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/UrlFetcherTest.java 
b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/UrlFetcherTest.java
new file mode 100644
index 0000000..7fb96bf
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/UrlFetcherTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.fetcher;
+
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.metadata.Metadata;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.zip.GZIPInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+@Ignore("requires network connectivity")
+public class UrlFetcherTest {
+
+    @Test
+    public void testRange() throws Exception {
+        String url =
+                
"https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";;
+        long start = 969596307;
+        long end = start + 1408 - 1;
+        Metadata metadata = new Metadata();
+
+        try (TemporaryResources tmp = new TemporaryResources()) {
+            Path tmpPath = tmp.createTempFile();
+            try (InputStream is = new SimpleUrlFetcher().fetch(url, start, 
end, metadata)) {
+                Files.copy(new GZIPInputStream(is), tmpPath, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+            assertEquals(2461, Files.size(tmpPath));
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 25026a1..217881e 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -21,18 +21,13 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
-import org.apache.commons.io.IOUtils;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetcher.FetcherStringException;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.slf4j.Logger;
@@ -40,12 +35,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.CopyOption;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collections;
 import java.util.Map;
-import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
@@ -57,6 +48,7 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(S3Fetcher.class);
     private static final String PREFIX = "s3";
+    private static final Pattern RANGE_PATTERN = 
Pattern.compile("\\A(.*?):(\\d+):(\\d+)\\Z");
     private String region;
     private String bucket;
     private String profile;
@@ -71,20 +63,49 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable {
         LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
                 fetchKey, bucket);
 
-        S3Object fullObject = s3Client.getObject(new GetObjectRequest(bucket, 
fetchKey));
+        S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, 
fetchKey));
         if (extractUserMetadata) {
             for (Map.Entry<String, String> e :
-                    
fullObject.getObjectMetadata().getUserMetadata().entrySet()) {
+                    s3Object.getObjectMetadata().getUserMetadata().entrySet()) 
{
                 metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
             }
         }
         if (! spoolToTemp) {
             return TikaInputStream.get(
-                    fullObject.getObjectContent());
+                    s3Object.getObjectContent());
         } else {
             long start = System.currentTimeMillis();
             TikaInputStream tis = TikaInputStream.get(
-                    fullObject.getObjectContent());
+                    s3Object.getObjectContent());
+            tis.getPath();
+            long elapsed = System.currentTimeMillis()-start;
+            LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
+            return tis;
+        }
+    }
+
+    public InputStream fetch(String fetchKey, long startRange, long endRange, 
Metadata metadata)
+            throws TikaException, IOException {
+        //TODO -- figure out how to integrate this
+        LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket 
({})",
+                fetchKey, startRange, endRange, bucket);
+
+        S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, 
fetchKey)
+                .withRange(startRange, endRange));
+
+        if (extractUserMetadata) {
+            for (Map.Entry<String, String> e :
+                    s3Object.getObjectMetadata().getUserMetadata().entrySet()) 
{
+                metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
+            }
+        }
+        if (! spoolToTemp) {
+            return TikaInputStream.get(
+                    s3Object.getObjectContent());
+        } else {
+            long start = System.currentTimeMillis();
+            TikaInputStream tis = TikaInputStream.get(
+                    s3Object.getObjectContent());
             tis.getPath();
             long elapsed = System.currentTimeMillis()-start;
             LOGGER.debug("took {} ms to copy to local tmp file", elapsed);

Reply via email to