This is an automated email from the ASF dual-hosted git repository. tilman pushed a commit to branch branch_3x in repository https://gitbox.apache.org/repos/asf/tika.git
commit e076a77ff0a4beee16ada50b01eaa1ca7cb4339d Author: Tilman Hausherr <[email protected]> AuthorDate: Sat Oct 25 13:04:28 2025 +0200 TIKA-4525: migrate to aws v2 --- tika-parent/pom.xml | 8 -- tika-pipes/tika-emitters/tika-emitter-s3/pom.xml | 12 +- .../apache/tika/pipes/emitter/s3/S3Emitter.java | 111 +++++++++++------- tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml | 12 +- .../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 127 ++++++++++++--------- .../tika-pipes-iterator-s3/pom.xml | 12 +- .../pipes/pipesiterator/s3/S3PipesIterator.java | 100 +++++++++------- 7 files changed, 224 insertions(+), 158 deletions(-) diff --git a/tika-parent/pom.xml b/tika-parent/pom.xml index 02a5cdb91..fb84e03ed 100644 --- a/tika-parent/pom.xml +++ b/tika-parent/pom.xml @@ -310,7 +310,6 @@ <!-- dependency versions --> <!-- change threetenbp exclusion version --> <google.cloud.version>2.59.0</google.cloud.version> - <aws.version>1.12.792</aws.version> <aws2.version>2.36.2</aws2.version> <!-- WARNING: when you upgrade asm make sure that you update the OpCode in the initializer in org.apache.tika.parser.asm.XHTMLClassVisitor @@ -576,13 +575,6 @@ <!-- can't update to 7.0.0: "class file has wrong version 61.0, should be 55.0" --> <version>${biz.aqute.version}</version> </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-bom</artifactId> - <version>${aws.version}</version> - <type>pom</type> - <scope>import</scope> - </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml index f64638fd6..d19da77a3 100644 --- a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml +++ b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml @@ -43,15 +43,19 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j2-impl</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>apache-client</artifactId> + </dependency> </dependencies> <build> diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java index 7f5537356..de2a428dd 100644 --- a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java @@ -23,35 +23,42 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.Writer; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.HashMap; import java.util.List; import java.util.Map; -import com.amazonaws.AmazonClientException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.InstanceProfileCredentialsProvider; -import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; import org.apache.tika.config.Field; import org.apache.tika.config.Initializable; import org.apache.tika.config.InitializableProblemHandler; import org.apache.tika.config.Param; import org.apache.tika.exception.TikaConfigException; -import org.apache.tika.exception.TikaException; import org.apache.tika.io.TemporaryResources; import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; @@ -109,20 +116,22 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE private String fileExtension = "json"; private boolean spoolToTemp = true; private String prefix = null; - private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS; + private int maxConnections = SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS); private boolean pathStyleAccessEnabled = false; - private AmazonS3 s3Client; + private S3Client s3Client; /** * Requires the src-bucket/path/to/my/file.txt in the {@link TikaCoreProperties#SOURCE_PATH}. * + * @param emitKey * @param metadataList + * @param parseContext * @throws IOException - * @throws TikaException + * @throws TikaEmitterException */ @Override public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException, TikaEmitterException { - if (metadataList == null || metadataList.size() == 0) { + if (metadataList == null || metadataList.isEmpty()) { throw new TikaEmitterException("metadata list must not be null or of size 0"); } @@ -158,7 +167,9 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE * @param path -- object path, not including the bucket * @param is inputStream to copy * @param userMetadata this will be written to the s3 ObjectMetadata's userMetadata - * @throws TikaEmitterException or IOexception if there is a Runtime s3 client exception + * @param parseContext + * @throws IOException if there is a Runtime s3 client exception + * @throws TikaEmitterException if there is a Runtime s3 client exception */ @Override public void emit(String path, InputStream is, Metadata userMetadata, ParseContext parseContext) throws IOException, TikaEmitterException { @@ -173,13 +184,13 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE LOGGER.debug("about to emit to target bucket: ({}) path:({})", bucket, path); - ObjectMetadata objectMetadata = new ObjectMetadata(); + Map<String,String> metadataMap = new HashMap<>(); for (String n : userMetadata.names()) { String[] vals = userMetadata.getValues(n); if (vals.length > 1) { LOGGER.warn("Can only write the first value for key {}. I see {} values.", n, vals.length); } - objectMetadata.addUserMetadata(n, vals[0]); + metadataMap.put(n, vals[0]); } //In practice, sending a file is more robust //We ran into stream reset issues during digesting, and aws doesn't @@ -187,8 +198,9 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE if (is instanceof TikaInputStream) { if (((TikaInputStream) is).hasFile()) { try { - PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, path, ((TikaInputStream) is).getFile()).withMetadata(objectMetadata); - s3Client.putObject(putObjectRequest); + PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).metadata(metadataMap).build(); + RequestBody requestBody = RequestBody.fromFile(((TikaInputStream) is).getFile()); + s3Client.putObject(request, requestBody); } catch (IOException e) { throw new TikaEmitterException("exception sending underlying file", e); } @@ -196,8 +208,10 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE } } try { - s3Client.putObject(bucket, path, is, objectMetadata); - } catch (AmazonClientException e) { + PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).metadata(metadataMap).build(); + RequestBody requestBody = RequestBody.fromBytes(is.readAllBytes()); + s3Client.putObject(request, requestBody); + } catch (S3Exception e) { throw new IOException("problem writing s3object", e); } } @@ -286,7 +300,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE /** * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions, - * e.g. AmazonClientException in a TikaConfigException. + * e.g. SdkClientException in a TikaConfigException. * * @param params params to use for initialization * @throws TikaConfigException @@ -294,30 +308,39 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE @Override public void initialize(Map<String, Param> params) throws TikaConfigException { //params have already been set...ignore them - AWSCredentialsProvider provider; - if ("instance".equals(credentialsProvider)) { - provider = InstanceProfileCredentialsProvider.getInstance(); - } else if ("profile".equals(credentialsProvider)) { - provider = new ProfileCredentialsProvider(profile); - } else if (credentialsProvider.equals("key_secret")) { - provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); - } else { - throw new TikaConfigException("credentialsProvider must be set and " + "must be either 'instance', 'profile' or 'key_secret'"); + AwsCredentialsProvider provider; + switch (credentialsProvider) { + case "instance": + provider = InstanceProfileCredentialsProvider.builder().build(); + break; + case "profile": + provider = ProfileCredentialsProvider.builder().profileName(profile).build(); + break; + case "key_secret": + AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey); + provider = StaticCredentialsProvider.create(awsCreds); + break; + default: + throw new TikaConfigException("credentialsProvider must be set and " + "must be either 'instance', 'profile' or 'key_secret'"); } - ClientConfiguration clientConfig = new ClientConfiguration().withMaxConnections(maxConnections); + SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(maxConnections).build(); + S3Configuration clientConfig = S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccessEnabled).build(); try { - AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder - .standard() - .withClientConfiguration(clientConfig) - .withCredentials(provider) - .withPathStyleAccessEnabled(pathStyleAccessEnabled); + S3ClientBuilder s3ClientBuilder = S3Client.builder().httpClient(httpClient). + requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED). // https://stackoverflow.com/a/79488850/535646 + serviceConfiguration(clientConfig).credentialsProvider(provider); if (!StringUtils.isBlank(endpointConfigurationService)) { - amazonS3ClientBuilder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointConfigurationService, region)); + try { + s3ClientBuilder.endpointOverride(new URI(endpointConfigurationService)).region(Region.of(region)); + } + catch (URISyntaxException ex) { + throw new TikaConfigException("bad endpointConfigurationService: " + endpointConfigurationService, ex); + } } else { - amazonS3ClientBuilder.withRegion(region); + s3ClientBuilder.region(Region.of(region)); } - s3Client = amazonS3ClientBuilder.build(); - } catch (AmazonClientException e) { + s3Client = s3ClientBuilder.build(); + } catch (SdkClientException e) { throw new TikaConfigException("can't initialize s3 emitter", e); } } diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml b/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml index 85a2eee4c..3c7534bb6 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml +++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml @@ -29,10 +29,6 @@ <name>Apache Tika S3 fetcher</name> <dependencies> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j2-impl</artifactId> @@ -44,6 +40,14 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>apache-client</artifactId> + </dependency> </dependencies> <build> diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java index ab4a139a0..4f8f602f1 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java +++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java @@ -20,29 +20,36 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty; import java.io.IOException; import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.HashMap; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Set; -import com.amazonaws.AmazonClientException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.InstanceProfileCredentialsProvider; -import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import org.apache.tika.config.Field; import org.apache.tika.config.Initializable; @@ -119,11 +126,11 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe private String prefix; private String credentialsProvider; private boolean extractUserMetadata = true; - private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS; - private AmazonS3 s3Client; + private int maxConnections = SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS); + private S3Client s3Client; private boolean spoolToTemp = true; - private int retries = 0; - private long sleepBeforeRetryMillis = 30000; + private int retries = 0; //TODO why isn't this used? Add getter/setter? + private long sleepBeforeRetryMillis = 30000; //TODO delete setSleepBeforeRetryMillis() after copying to 3.0? private long[] throttleSeconds = null; @@ -158,14 +165,17 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe long elapsed = System.currentTimeMillis() - start; LOGGER.debug("total to fetch {}", elapsed); return is; - } catch (AmazonS3Exception e) { - if (e.getErrorCode() != null && NO_RETRY_ERROR_CODES.contains(e.getErrorCode())) { - LOGGER.warn("Hit a no retry error code. Not retrying." + tries, e); - throw new IOException(e); + } catch (AwsServiceException e) { + if (e.awsErrorDetails() != null) { + String errorCode = e.awsErrorDetails().errorCode(); + if (errorCode != null && NO_RETRY_ERROR_CODES.contains(e.awsErrorDetails().errorCode())) { + LOGGER.warn("Hit a no retry error code. Not retrying." + tries, e); + throw new IOException(e); + } } LOGGER.warn("client exception fetching on retry=" + tries, e); ex = new IOException(e); - } catch (AmazonClientException e) { + } catch (SdkClientException e) { LOGGER.warn("client exception fetching on retry=" + tries, e); ex = new IOException(e); } catch (IOException e) { @@ -188,18 +198,20 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe private InputStream _fetch(String fetchKey, Metadata metadata, Long startRange, Long endRange) throws IOException { TemporaryResources tmp = null; + ResponseInputStream<GetObjectResponse> s3Object = null; try { long start = System.currentTimeMillis(); - GetObjectRequest objectRequest = new GetObjectRequest(bucket, fetchKey); + GetObjectRequest.Builder builder = GetObjectRequest.builder().bucket(bucket).key(fetchKey); if (startRange != null && endRange != null && startRange > -1 && endRange > -1) { - objectRequest.withRange(startRange, endRange); + String range = String.format(Locale.US, "bytes=%d-%d", startRange, endRange); + builder.range(range); } - S3Object s3Object = null; + GetObjectRequest objectRequest = builder.build(); synchronized (clientLock) { s3Object = s3Client.getObject(objectRequest); } - long length = s3Object.getObjectMetadata().getContentLength(); + long length = s3Object.response().contentLength(); metadata.set(Metadata.CONTENT_LENGTH, Long.toString(length)); if (maxLength > -1) { if (length > maxLength) { @@ -209,19 +221,17 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe LOGGER.debug("took {} ms to fetch file's metadata", System.currentTimeMillis() - start); if (extractUserMetadata) { - for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata() - .entrySet()) { + for (Map.Entry<String, String> e : s3Object.response().metadata().entrySet()) { metadata.add(PREFIX + ":" + e.getKey(), e.getValue()); } } if (!spoolToTemp) { - return TikaInputStream.get(s3Object.getObjectContent()); + return TikaInputStream.get(s3Object); } else { start = System.currentTimeMillis(); tmp = new TemporaryResources(); Path tmpPath = tmp.createTempFile(FilenameUtils.getSuffixFromPath(fetchKey)); - Files.copy(s3Object.getObjectContent(), tmpPath, - StandardCopyOption.REPLACE_EXISTING); + Files.copy(s3Object, tmpPath, StandardCopyOption.REPLACE_EXISTING); TikaInputStream tis = TikaInputStream.get(tmpPath, metadata, tmp); LOGGER.debug("took {} ms to fetch metadata and copy to local tmp file", System.currentTimeMillis() - start); @@ -231,6 +241,9 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe if (tmp != null) { tmp.close(); } + if (s3Object != null) { + s3Object.close(); + } throw e; } } @@ -349,7 +362,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe /** * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions, - * e.g. AmazonClientException in a TikaConfigException. + * e.g. SdkClientException in a TikaConfigException. * * @param params params to use for initialization * @throws TikaConfigException @@ -357,36 +370,40 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe @Override public void initialize(Map<String, Param> params) throws TikaConfigException { //params have already been set...ignore them - AWSCredentialsProvider provider; - if (credentialsProvider.equals("instance")) { - provider = InstanceProfileCredentialsProvider.getInstance(); - } else if (credentialsProvider.equals("profile")) { - provider = new ProfileCredentialsProvider(profile); - } else if (credentialsProvider.equals("key_secret")) { - provider = - new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); - } else { - throw new TikaConfigException("credentialsProvider must be set and " + - "must be either 'instance', 'profile' or 'key_secret'"); + AwsCredentialsProvider provider; + switch (credentialsProvider) { + case "instance": + provider = InstanceProfileCredentialsProvider.builder().build(); + break; + case "profile": + provider = ProfileCredentialsProvider.builder().profileName(profile).build(); + break; + case "key_secret": + AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey); + provider = StaticCredentialsProvider.create(awsCreds); + break; + default: + throw new TikaConfigException("credentialsProvider must be set and " + "must be either 'instance', 'profile' or 'key_secret'"); } - ClientConfiguration clientConfiguration = new ClientConfiguration() - .withMaxConnections(maxConnections); + SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(maxConnections).build(); + S3Configuration clientConfig = S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccessEnabled).build(); try { synchronized (clientLock) { - AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder.standard() - .withClientConfiguration(clientConfiguration) - .withPathStyleAccessEnabled(pathStyleAccessEnabled) - .withCredentials(provider); + S3ClientBuilder s3ClientBuilder = S3Client.builder().httpClient(httpClient). + serviceConfiguration(clientConfig).credentialsProvider(provider); if (!StringUtils.isBlank(endpointConfigurationService)) { - amazonS3ClientBuilder.setEndpointConfiguration( - new AwsClientBuilder - .EndpointConfiguration(endpointConfigurationService, region)); + try { + s3ClientBuilder.endpointOverride(new URI(endpointConfigurationService)).region(Region.of(region)); + } + catch (URISyntaxException ex) { + throw new TikaConfigException("bad endpointConfigurationService: " + endpointConfigurationService, ex); + } } else { - amazonS3ClientBuilder.withRegion(region); + s3ClientBuilder.region(Region.of(region)); } - s3Client = amazonS3ClientBuilder.build(); + s3Client = s3ClientBuilder.build(); } - } catch (AmazonClientException e) { + } catch (SdkClientException e) { throw new TikaConfigException("can't initialize s3 fetcher", e); } if (throttleSeconds == null) { diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml index 0cfcb7049..d754581ea 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml @@ -38,15 +38,19 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j2-impl</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>apache-client</artifactId> + </dependency> </dependencies> <build> <plugins> diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java index 38fc1889c..7275f1cba 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java @@ -19,25 +19,32 @@ package org.apache.tika.pipes.pipesiterator.s3; import static org.apache.tika.config.TikaConfig.mustNotBeEmpty; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; -import com.amazonaws.AmazonClientException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.InstanceProfileCredentialsProvider; -import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.iterable.S3Objects; -import com.amazonaws.services.s3.model.S3ObjectSummary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.S3Object; import org.apache.tika.config.Field; import org.apache.tika.config.Initializable; @@ -66,10 +73,10 @@ public class S3PipesIterator extends PipesIterator implements Initializable { private String profile; private String bucket; private Pattern fileNamePattern = null; - private int maxConnections = ClientConfiguration.DEFAULT_MAX_CONNECTIONS; + private int maxConnections = SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS); private boolean pathStyleAccessEnabled = false; - private AmazonS3 s3Client; + private S3Client s3Client; @Field public void setEndpointConfigurationService(String endpointConfigurationService) { @@ -136,7 +143,7 @@ public class S3PipesIterator extends PipesIterator implements Initializable { /** * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions, - * e.g. AmazonClientException in a TikaConfigException. + * e.g. SdkClientException in a TikaConfigException. * * @param params params to use for initialization * @throws TikaConfigException @@ -144,31 +151,39 @@ public class S3PipesIterator extends PipesIterator implements Initializable { @Override public void initialize(Map<String, Param> params) throws TikaConfigException { //params have already been set...ignore them - AWSCredentialsProvider provider; - if (credentialsProvider.equals("instance")) { - provider = InstanceProfileCredentialsProvider.getInstance(); - } else if (credentialsProvider.equals("profile")) { - provider = new ProfileCredentialsProvider(profile); - } else if (credentialsProvider.equals("key_secret")) { - provider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); - } else { - throw new TikaConfigException("credentialsProvider must be set and " + "must be either 'instance', 'profile' or 'key_secret'"); + AwsCredentialsProvider provider; + switch (credentialsProvider) { + case "instance": + provider = InstanceProfileCredentialsProvider.builder().build(); + break; + case "profile": + provider = ProfileCredentialsProvider.builder().profileName(profile).build(); + break; + case "key_secret": + AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey); + provider = StaticCredentialsProvider.create(awsCreds); + break; + default: + throw new TikaConfigException("credentialsProvider must be set and " + "must be either 'instance', 'profile' or 'key_secret'"); } - ClientConfiguration clientConfig = new ClientConfiguration().withMaxConnections(maxConnections); + SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(maxConnections).build(); + S3Configuration clientConfig = S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccessEnabled).build(); try { - AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder - .standard() - .withClientConfiguration(clientConfig) - .withCredentials(provider) - .withPathStyleAccessEnabled(pathStyleAccessEnabled); + S3ClientBuilder s3ClientBuilder = S3Client.builder().httpClient(httpClient). + serviceConfiguration(clientConfig).credentialsProvider(provider); if (!StringUtils.isBlank(endpointConfigurationService)) { - amazonS3ClientBuilder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointConfigurationService, region)); + try { + s3ClientBuilder.endpointOverride(new URI(endpointConfigurationService)).region(Region.of(region)); + } + catch (URISyntaxException ex) { + throw new TikaConfigException("bad endpointConfigurationService: " + endpointConfigurationService, ex); + } } else { - amazonS3ClientBuilder.withRegion(region); + s3ClientBuilder.region(Region.of(region)); } - s3Client = amazonS3ClientBuilder.build(); - } catch (AmazonClientException e) { + s3Client = s3ClientBuilder.build(); + } catch (SdkClientException e) { throw new TikaConfigException("can't initialize s3 pipesiterator", e); } } @@ -187,20 +202,27 @@ public class S3PipesIterator extends PipesIterator implements Initializable { long start = System.currentTimeMillis(); int count = 0; HandlerConfig handlerConfig = getHandlerConfig(); - Matcher fileNameMatcher = null; + final Matcher fileNameMatcher; if (fileNamePattern != null) { fileNameMatcher = fileNamePattern.matcher(""); + } else { + fileNameMatcher = null; } - for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, prefix)) { - if (fileNameMatcher != null && !accept(fileNameMatcher, summary.getKey())) { + + ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build(); + List<S3Object> s3ObjectList = s3Client.listObjectsV2Paginator(listObjectsV2Request).stream(). + flatMap(resp -> resp.contents().stream()).collect(Collectors.toList()); + for (S3Object s3Object : s3ObjectList) { + String key = s3Object.key(); + if (fileNameMatcher != null && !accept(fileNameMatcher, key)) { continue; } long elapsed = System.currentTimeMillis() - start; - LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(), elapsed); + LOGGER.debug("adding ({}) {} in {} ms", count, key, elapsed); //TODO -- allow user specified metadata as the "id"? ParseContext parseContext = new ParseContext(); parseContext.set(HandlerConfig.class, handlerConfig); - tryToAdd(new FetchEmitTuple(summary.getKey(), new FetchKey(fetcherName, summary.getKey()), new EmitKey(emitterName, summary.getKey()), new Metadata(), parseContext, + tryToAdd(new FetchEmitTuple(key, new FetchKey(fetcherName, key), new EmitKey(emitterName, key), new Metadata(), parseContext, getOnParseException())); count++; }
