This is an automated email from the ASF dual-hosted git repository.

tilman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f10ce33c TIKA-4525: migrate to aws v2
7f10ce33c is described below

commit 7f10ce33cfc67754627ff15d7454f2765326c8fc
Author: Tilman Hausherr <[email protected]>
AuthorDate: Mon Oct 20 13:10:04 2025 +0200

    TIKA-4525: migrate to aws v2
---
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    | 130 +++++++++++----------
 1 file changed, 71 insertions(+), 59 deletions(-)

diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 9176a7409..8bb0b67a9 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -20,29 +20,37 @@ import static 
org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.auth.InstanceProfileCredentialsProvider;
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
@@ -119,11 +127,11 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable, RangeFe
     private String prefix;
     private String credentialsProvider;
     private boolean extractUserMetadata = true;
-    private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS;
-    private AmazonS3 s3Client;
+    private int maxConnections = 
SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
+    private S3Client s3Client;
     private boolean spoolToTemp = true;
-    private int retries = 0;
-    private long sleepBeforeRetryMillis = 30000;
+    private int retries = 0; //TODO why isn't this used? Add getter/setter?
+    private long sleepBeforeRetryMillis = 30000; //TODO delete 
setSleepBeforeRetryMillis() after copying to 3.0?
 
     private long[] throttleSeconds = null;
 
@@ -158,14 +166,17 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable, RangeFe
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("total to fetch {}", elapsed);
                 return is;
-            } catch (AmazonS3Exception e) {
-                if (e.getErrorCode() != null && 
NO_RETRY_ERROR_CODES.contains(e.getErrorCode())) {
-                    LOGGER.warn("Hit a no retry error code. Not retrying." + 
tries, e);
-                    throw new IOException(e);
+            } catch (AwsServiceException e) {
+                if (e.awsErrorDetails() != null) {
+                    String errorCode = e.awsErrorDetails().errorCode();
+                    if (errorCode != null && 
NO_RETRY_ERROR_CODES.contains(e.awsErrorDetails().errorCode())) {
+                        LOGGER.warn("Hit a no retry error code. Not retrying." 
+ tries, e);
+                        throw new IOException(e);
+                    }
                 }
                 LOGGER.warn("client exception fetching on retry=" + tries, e);
                 ex = new IOException(e);
-            } catch (AmazonClientException e) {
+            } catch (SdkClientException e) {
                 LOGGER.warn("client exception fetching on retry=" + tries, e);
                 ex = new IOException(e);
             } catch (IOException e) {
@@ -190,16 +201,18 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable, RangeFe
         TemporaryResources tmp = null;
         try {
             long start = System.currentTimeMillis();
-            GetObjectRequest objectRequest = new GetObjectRequest(bucket, 
fetchKey);
+            GetObjectRequest.Builder builder = 
GetObjectRequest.builder().bucket(bucket).key(fetchKey);
             if (startRange != null && endRange != null
                     && startRange > -1 && endRange > -1) {
-                objectRequest.withRange(startRange, endRange);
+                String range = String.format(Locale.US, "bytes=%d-%d", 
startRange, endRange);
+                builder.range(range);
             }
-            S3Object s3Object = null;
+            GetObjectRequest objectRequest = builder.build();
+            ResponseInputStream<GetObjectResponse> s3Object = null;
             synchronized (clientLock) {
                 s3Object = s3Client.getObject(objectRequest);
             }
-            long length = s3Object.getObjectMetadata().getContentLength();
+            long length = s3Object.response().contentLength();
             metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length));
             if (maxLength > -1) {
                 if (length > maxLength) {
@@ -209,19 +222,17 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable, RangeFe
             LOGGER.debug("took {} ms to fetch file's metadata", 
System.currentTimeMillis() - start);
 
             if (extractUserMetadata) {
-                for (Map.Entry<String, String> e : 
s3Object.getObjectMetadata().getUserMetadata()
-                        .entrySet()) {
+                for (Map.Entry<String, String> e : 
s3Object.response().metadata().entrySet()) {
                     metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
                 }
             }
             if (!spoolToTemp) {
-                return TikaInputStream.get(s3Object.getObjectContent());
+                return TikaInputStream.get(s3Object);
             } else {
                 start = System.currentTimeMillis();
                 tmp = new TemporaryResources();
                 Path tmpPath = 
tmp.createTempFile(FilenameUtils.getSuffixFromPath(fetchKey));
-                Files.copy(s3Object.getObjectContent(), tmpPath,
-                        StandardCopyOption.REPLACE_EXISTING);
+                Files.copy(s3Object, tmpPath, 
StandardCopyOption.REPLACE_EXISTING);
                 TikaInputStream tis = TikaInputStream.get(tmpPath, metadata, 
tmp);
                 LOGGER.debug("took {} ms to fetch metadata and copy to local 
tmp file",
                         System.currentTimeMillis() - start);
@@ -357,37 +368,38 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable, RangeFe
     @Override
     public void initialize(Map<String, Param> params) throws 
TikaConfigException {
         //params have already been set...ignore them
-        AWSCredentialsProvider provider;
-        if (credentialsProvider.equals("instance")) {
-            provider = InstanceProfileCredentialsProvider.getInstance();
-        } else if (credentialsProvider.equals("profile")) {
-            provider = new ProfileCredentialsProvider(profile);
-        } else if (credentialsProvider.equals("key_secret")) {
-            provider =
-                    new AWSStaticCredentialsProvider(new 
BasicAWSCredentials(accessKey, secretKey));
-        } else {
-            throw new TikaConfigException("credentialsProvider must be set and 
" +
-                    "must be either 'instance', 'profile' or 'key_secret'");
+        AwsCredentialsProvider provider;
+        switch (credentialsProvider) {
+            case "instance":
+                provider = 
InstanceProfileCredentialsProvider.builder().build();
+                break;
+            case "profile":
+                provider = 
ProfileCredentialsProvider.builder().profileName(profile).build();
+                break;
+            case "key_secret":
+                AwsBasicCredentials awsCreds = 
AwsBasicCredentials.create(accessKey, secretKey);
+                provider = StaticCredentialsProvider.create(awsCreds);
+                break;
+            default:
+                throw new TikaConfigException("credentialsProvider must be set 
and " + "must be either 'instance', 'profile' or 'key_secret'");
         }
-        ClientConfiguration clientConfiguration = new ClientConfiguration()
-                .withMaxConnections(maxConnections);
-        try {
-            synchronized (clientLock) {
-                AmazonS3ClientBuilder amazonS3ClientBuilder = 
AmazonS3ClientBuilder.standard()
-                        .withClientConfiguration(clientConfiguration)
-                        .withPathStyleAccessEnabled(pathStyleAccessEnabled)
-                        .withCredentials(provider);
-                if (!StringUtils.isBlank(endpointConfigurationService)) {
-                    amazonS3ClientBuilder.setEndpointConfiguration(
-                            new AwsClientBuilder
-                                    
.EndpointConfiguration(endpointConfigurationService, region));
-                } else {
-                    amazonS3ClientBuilder.withRegion(region);
+        SdkHttpClient httpClient = 
ApacheHttpClient.builder().maxConnections(maxConnections).build();
+        S3Configuration clientConfig = 
S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccessEnabled).build();
+        synchronized (clientLock) {
+            S3ClientBuilder s3ClientBuilder = 
S3Client.builder().httpClient(httpClient).
+                    
requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED). // 
https://stackoverflow.com/a/79488850/535646
+                    
serviceConfiguration(clientConfig).credentialsProvider(provider);
+            if (!StringUtils.isBlank(endpointConfigurationService)) {
+                try {
+                    s3ClientBuilder.endpointOverride(new 
URI(endpointConfigurationService)).region(Region.of(region));
                 }
-                s3Client = amazonS3ClientBuilder.build();
+                catch (URISyntaxException ex) {
+                    throw new TikaConfigException("bad 
endpointConfigurationService: " + endpointConfigurationService, ex);
+                }
+            } else {
+                s3ClientBuilder.region(Region.of(region));
             }
-        } catch (AmazonClientException e) {
-            throw new TikaConfigException("can't initialize s3 fetcher", e);
+            s3Client = s3ClientBuilder.build();
         }
         if (throttleSeconds == null) {
             throttleSeconds = new long[retries];

Reply via email to