This is an automated email from the ASF dual-hosted git repository.
tilman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new d21a66fb7 TIKA-4525: migrate to aws v2
d21a66fb7 is described below
commit d21a66fb789b1fe00e522bcd37bcd6373c4e09ed
Author: Tilman Hausherr <[email protected]>
AuthorDate: Sun Oct 19 15:38:51 2025 +0200
TIKA-4525: migrate to aws v2
---
tika-pipes/tika-emitters/tika-emitter-s3/pom.xml | 8 +-
.../apache/tika/pipes/emitter/s3/S3Emitter.java | 110 ++++++++++++---------
2 files changed, 69 insertions(+), 49 deletions(-)
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
index b48a8c302..1cae9880d 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
@@ -37,8 +37,12 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-s3</artifactId>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
diff --git
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index c454c6c6a..93f59f076 100644
---
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -17,41 +17,44 @@
package org.apache.tika.pipes.emitter.s3;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+import static
software.amazon.awssdk.http.SdkHttpConfigurationOption.MAX_CONNECTIONS;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.auth.InstanceProfileCredentialsProvider;
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
@@ -109,20 +112,22 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
private String fileExtension = "json";
private boolean spoolToTemp = true;
private String prefix = null;
- private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS;
+ private int maxConnections =
SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(MAX_CONNECTIONS);
private boolean pathStyleAccessEnabled = false;
- private AmazonS3 s3Client;
+ private S3Client s3Client;
/**
* Requires the src-bucket/path/to/my/file.txt in the {@link
TikaCoreProperties#SOURCE_PATH}.
*
+ * @param emitKey
* @param metadataList
+ * @param parseContext
* @throws IOException
- * @throws TikaException
+ * @throws TikaEmitterException
*/
@Override
public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext) throws IOException, TikaEmitterException {
- if (metadataList == null || metadataList.size() == 0) {
+ if (metadataList == null || metadataList.isEmpty()) {
throw new TikaEmitterException("metadata list must not be null or
of size 0");
}
@@ -158,7 +163,9 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
* @param path -- object path, not including the bucket
* @param is inputStream to copy
* @param userMetadata this will be written to the s3 ObjectMetadata's
userMetadata
- * @throws TikaEmitterException or IOexception if there is a Runtime s3
client exception
+ * @param parseContext
+ * @throws IOException if there is a Runtime s3 client exception
+ * @throws TikaEmitterException if there is a Runtime s3 client exception
*/
@Override
public void emit(String path, InputStream is, Metadata userMetadata,
ParseContext parseContext) throws IOException, TikaEmitterException {
@@ -173,13 +180,13 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
LOGGER.debug("about to emit to target bucket: ({}) path:({})", bucket,
path);
- ObjectMetadata objectMetadata = new ObjectMetadata();
+ Map<String,String> metadataMap = new HashMap<>();
for (String n : userMetadata.names()) {
String[] vals = userMetadata.getValues(n);
if (vals.length > 1) {
LOGGER.warn("Can only write the first value for key {}. I see
{} values.", n, vals.length);
}
- objectMetadata.addUserMetadata(n, vals[0]);
+ metadataMap.put(n, vals[0]);
}
//In practice, sending a file is more robust
//We ran into stream reset issues during digesting, and aws doesn't
@@ -187,8 +194,9 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
if (is instanceof TikaInputStream) {
if (((TikaInputStream) is).hasFile()) {
try {
- PutObjectRequest putObjectRequest = new
PutObjectRequest(bucket, path, ((TikaInputStream)
is).getFile()).withMetadata(objectMetadata);
- s3Client.putObject(putObjectRequest);
+ PutObjectRequest request =
PutObjectRequest.builder().bucket(bucket).key(path).metadata(metadataMap).build();
+ RequestBody requestBody =
RequestBody.fromFile(((TikaInputStream) is).getFile());
+ s3Client.putObject(request, requestBody);
} catch (IOException e) {
throw new TikaEmitterException("exception sending
underlying file", e);
}
@@ -196,8 +204,10 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
}
}
try {
- s3Client.putObject(bucket, path, is, objectMetadata);
- } catch (AmazonClientException e) {
+ PutObjectRequest request =
PutObjectRequest.builder().bucket(bucket).key(path).metadata(metadataMap).build();
+ RequestBody requestBody = RequestBody.fromBytes(is.readAllBytes());
+ s3Client.putObject(request, requestBody);
+ } catch (S3Exception e) {
throw new IOException("problem writing s3object", e);
}
}
@@ -294,32 +304,38 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
@Override
public void initialize(Map<String, Param> params) throws
TikaConfigException {
//params have already been set...ignore them
- AWSCredentialsProvider provider;
- if ("instance".equals(credentialsProvider)) {
- provider = InstanceProfileCredentialsProvider.getInstance();
- } else if ("profile".equals(credentialsProvider)) {
- provider = new ProfileCredentialsProvider(profile);
- } else if (credentialsProvider.equals("key_secret")) {
- provider = new AWSStaticCredentialsProvider(new
BasicAWSCredentials(accessKey, secretKey));
- } else {
- throw new TikaConfigException("credentialsProvider must be set and
" + "must be either 'instance', 'profile' or 'key_secret'");
+ software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
provider;
+ switch (credentialsProvider)
+ {
+ case "instance":
+ provider =
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.builder().build();
+ break;
+ case "profile":
+ provider =
software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider.builder().profileName(profile).build();
+ break;
+ case "key_secret":
+ AwsBasicCredentials awsCreds =
AwsBasicCredentials.create(accessKey, secretKey);
+ provider =
software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(awsCreds);
+ break;
+ default:
+ throw new TikaConfigException("credentialsProvider must be set
and " + "must be either 'instance', 'profile' or 'key_secret'");
}
- ClientConfiguration clientConfig = new
ClientConfiguration().withMaxConnections(maxConnections);
- try {
- AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder
- .standard()
- .withClientConfiguration(clientConfig)
- .withCredentials(provider)
- .withPathStyleAccessEnabled(pathStyleAccessEnabled);
- if (!StringUtils.isBlank(endpointConfigurationService)) {
- amazonS3ClientBuilder.setEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(endpointConfigurationService, region));
- } else {
- amazonS3ClientBuilder.withRegion(region);
+ SdkHttpClient httpClient =
ApacheHttpClient.builder().maxConnections(maxConnections).build();
+ S3Configuration clientConfig1 =
S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccessEnabled).build();
+ S3ClientBuilder s3ClientBuilder =
S3Client.builder().httpClient(httpClient).
+
requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED). //
https://stackoverflow.com/a/79488850/535646
+
serviceConfiguration(clientConfig1).credentialsProvider(provider);
+ if (!StringUtils.isBlank(endpointConfigurationService)) {
+ try {
+ s3ClientBuilder.endpointOverride(new
URI(endpointConfigurationService)).region(Region.of(region));
}
- s3Client = amazonS3ClientBuilder.build();
- } catch (AmazonClientException e) {
- throw new TikaConfigException("can't initialize s3 emitter", e);
+ catch (URISyntaxException ex) {
+ throw new TikaConfigException("bad
endpointConfigurationService: " + endpointConfigurationService, ex);
+ }
+ } else {
+ s3ClientBuilder.region(Region.of(region));
}
+ s3Client = s3ClientBuilder.build();
}
@Override