This is an automated email from the ASF dual-hosted git repository.
tilman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 7f10ce33c TIKA-4525: migrate to aws v2
7f10ce33c is described below
commit 7f10ce33cfc67754627ff15d7454f2765326c8fc
Author: Tilman Hausherr <[email protected]>
AuthorDate: Mon Oct 20 13:10:04 2025 +0200
TIKA-4525: migrate to aws v2
---
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 130 +++++++++++----------
1 file changed, 71 insertions(+), 59 deletions(-)
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 9176a7409..8bb0b67a9 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -20,29 +20,37 @@ import static
org.apache.tika.config.TikaConfig.mustNotBeEmpty;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.auth.InstanceProfileCredentialsProvider;
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
@@ -119,11 +127,11 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
private String prefix;
private String credentialsProvider;
private boolean extractUserMetadata = true;
- private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS;
- private AmazonS3 s3Client;
+ private int maxConnections =
SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
+ private S3Client s3Client;
private boolean spoolToTemp = true;
- private int retries = 0;
- private long sleepBeforeRetryMillis = 30000;
+ private int retries = 0; //TODO why isn't this used? Add getter/setter?
+ private long sleepBeforeRetryMillis = 30000; //TODO delete
setSleepBeforeRetryMillis() after copying to 3.0?
private long[] throttleSeconds = null;
@@ -158,14 +166,17 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("total to fetch {}", elapsed);
return is;
- } catch (AmazonS3Exception e) {
- if (e.getErrorCode() != null &&
NO_RETRY_ERROR_CODES.contains(e.getErrorCode())) {
- LOGGER.warn("Hit a no retry error code. Not retrying." +
tries, e);
- throw new IOException(e);
+ } catch (AwsServiceException e) {
+ if (e.awsErrorDetails() != null) {
+ String errorCode = e.awsErrorDetails().errorCode();
+ if (errorCode != null &&
NO_RETRY_ERROR_CODES.contains(e.awsErrorDetails().errorCode())) {
+ LOGGER.warn("Hit a no retry error code. Not retrying."
+ tries, e);
+ throw new IOException(e);
+ }
}
LOGGER.warn("client exception fetching on retry=" + tries, e);
ex = new IOException(e);
- } catch (AmazonClientException e) {
+ } catch (SdkClientException e) {
LOGGER.warn("client exception fetching on retry=" + tries, e);
ex = new IOException(e);
} catch (IOException e) {
@@ -190,16 +201,18 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
TemporaryResources tmp = null;
try {
long start = System.currentTimeMillis();
- GetObjectRequest objectRequest = new GetObjectRequest(bucket,
fetchKey);
+ GetObjectRequest.Builder builder =
GetObjectRequest.builder().bucket(bucket).key(fetchKey);
if (startRange != null && endRange != null
&& startRange > -1 && endRange > -1) {
- objectRequest.withRange(startRange, endRange);
+ String range = String.format(Locale.US, "bytes=%d-%d",
startRange, endRange);
+ builder.range(range);
}
- S3Object s3Object = null;
+ GetObjectRequest objectRequest = builder.build();
+ ResponseInputStream<GetObjectResponse> s3Object = null;
synchronized (clientLock) {
s3Object = s3Client.getObject(objectRequest);
}
- long length = s3Object.getObjectMetadata().getContentLength();
+ long length = s3Object.response().contentLength();
metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length));
if (maxLength > -1) {
if (length > maxLength) {
@@ -209,19 +222,17 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
LOGGER.debug("took {} ms to fetch file's metadata",
System.currentTimeMillis() - start);
if (extractUserMetadata) {
- for (Map.Entry<String, String> e :
s3Object.getObjectMetadata().getUserMetadata()
- .entrySet()) {
+ for (Map.Entry<String, String> e :
s3Object.response().metadata().entrySet()) {
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
if (!spoolToTemp) {
- return TikaInputStream.get(s3Object.getObjectContent());
+ return TikaInputStream.get(s3Object);
} else {
start = System.currentTimeMillis();
tmp = new TemporaryResources();
Path tmpPath =
tmp.createTempFile(FilenameUtils.getSuffixFromPath(fetchKey));
- Files.copy(s3Object.getObjectContent(), tmpPath,
- StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(s3Object, tmpPath,
StandardCopyOption.REPLACE_EXISTING);
TikaInputStream tis = TikaInputStream.get(tmpPath, metadata,
tmp);
LOGGER.debug("took {} ms to fetch metadata and copy to local
tmp file",
System.currentTimeMillis() - start);
@@ -357,37 +368,38 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
@Override
public void initialize(Map<String, Param> params) throws
TikaConfigException {
//params have already been set...ignore them
- AWSCredentialsProvider provider;
- if (credentialsProvider.equals("instance")) {
- provider = InstanceProfileCredentialsProvider.getInstance();
- } else if (credentialsProvider.equals("profile")) {
- provider = new ProfileCredentialsProvider(profile);
- } else if (credentialsProvider.equals("key_secret")) {
- provider =
- new AWSStaticCredentialsProvider(new
BasicAWSCredentials(accessKey, secretKey));
- } else {
- throw new TikaConfigException("credentialsProvider must be set and
" +
- "must be either 'instance', 'profile' or 'key_secret'");
+ AwsCredentialsProvider provider;
+ switch (credentialsProvider) {
+ case "instance":
+ provider =
InstanceProfileCredentialsProvider.builder().build();
+ break;
+ case "profile":
+ provider =
ProfileCredentialsProvider.builder().profileName(profile).build();
+ break;
+ case "key_secret":
+ AwsBasicCredentials awsCreds =
AwsBasicCredentials.create(accessKey, secretKey);
+ provider = StaticCredentialsProvider.create(awsCreds);
+ break;
+ default:
+ throw new TikaConfigException("credentialsProvider must be set
and " + "must be either 'instance', 'profile' or 'key_secret'");
}
- ClientConfiguration clientConfiguration = new ClientConfiguration()
- .withMaxConnections(maxConnections);
- try {
- synchronized (clientLock) {
- AmazonS3ClientBuilder amazonS3ClientBuilder =
AmazonS3ClientBuilder.standard()
- .withClientConfiguration(clientConfiguration)
- .withPathStyleAccessEnabled(pathStyleAccessEnabled)
- .withCredentials(provider);
- if (!StringUtils.isBlank(endpointConfigurationService)) {
- amazonS3ClientBuilder.setEndpointConfiguration(
- new AwsClientBuilder
-
.EndpointConfiguration(endpointConfigurationService, region));
- } else {
- amazonS3ClientBuilder.withRegion(region);
+ SdkHttpClient httpClient =
ApacheHttpClient.builder().maxConnections(maxConnections).build();
+ S3Configuration clientConfig =
S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccessEnabled).build();
+ synchronized (clientLock) {
+ S3ClientBuilder s3ClientBuilder =
S3Client.builder().httpClient(httpClient).
+
requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED). //
https://stackoverflow.com/a/79488850/535646
+
serviceConfiguration(clientConfig).credentialsProvider(provider);
+ if (!StringUtils.isBlank(endpointConfigurationService)) {
+ try {
+ s3ClientBuilder.endpointOverride(new
URI(endpointConfigurationService)).region(Region.of(region));
}
- s3Client = amazonS3ClientBuilder.build();
+ catch (URISyntaxException ex) {
+ throw new TikaConfigException("bad
endpointConfigurationService: " + endpointConfigurationService, ex);
+ }
+ } else {
+ s3ClientBuilder.region(Region.of(region));
}
- } catch (AmazonClientException e) {
- throw new TikaConfigException("can't initialize s3 fetcher", e);
+ s3Client = s3ClientBuilder.build();
}
if (throttleSeconds == null) {
throttleSeconds = new long[retries];