kfaraz commented on code in PR #18891:
URL: https://github.com/apache/druid/pull/18891#discussion_r2863249315
##########
pom.xml:
##########
@@ -119,7 +119,8 @@
<hadoop.compile.version>3.3.6</hadoop.compile.version>
<graaljs.version>22.3.5</graaljs.version>
<mockito.version>5.14.2</mockito.version>
- <aws.sdk.version>1.12.784</aws.sdk.version>
+ <aws.sdk.v1.version>1.12.784</aws.sdk.v1.version>
Review Comment:
Please add a comment on where v1 is still used and if there is a plan to
eventually get rid of it.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##########
@@ -229,11 +288,42 @@ static String constructSegmentBasePath(String baseKey,
String storageDir)
) + "/";
}
- static AccessControlList
grantFullControlToBucketOwner(ServerSideEncryptingAmazonS3 s3Client, String
bucket)
+ static Grant grantFullControlToBucketOwner(ServerSideEncryptingAmazonS3
s3Client, String bucket)
{
- final AccessControlList acl = s3Client.getBucketAcl(bucket);
- acl.grantAllPermissions(new Grant(new
CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl));
- return acl;
+ final String ownerId = s3Client.getBucketAcl(bucket).owner().id();
+ return Grant.builder()
+ .grantee(Grantee.builder()
+ .type(Type.CANONICAL_USER)
+ .id(ownerId)
+ .build())
Review Comment:
nit: formatting here
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java:
##########
@@ -84,6 +84,7 @@
*
* This tests both InputFormat and Parser. Parser is deprecated for Streaming
Ingestion,
* and those tests will be removed in the future.
+ *
Review Comment:
Nit: extra newline
##########
cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java:
##########
@@ -56,6 +72,21 @@ public boolean isForceGlobalBucketAccessEnabled()
return forceGlobalBucketAccessEnabled;
}
+ public int getConnectionTimeout()
+ {
+ return connectionTimeout;
+ }
+
+ public int getSocketTimeout()
Review Comment:
Maybe rename these to `getXyzTimeoutMillis()` to avoid ambiguity.
Other millis-related configs typically use the type `long` but I guess these
fields will never actually need to be that large.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##########
@@ -128,16 +129,18 @@ private Optional<InputStream> streamTaskFile(final long
offset, String taskKey)
start = contentLength + offset;
}
- final GetObjectRequest request = new
GetObjectRequest(config.getS3Bucket(), taskKey)
- .withMatchingETagConstraint(ensureQuotated(objectMetadata.getETag()))
- .withRange(start, end);
+ GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
+ .bucket(config.getS3Bucket())
+ .key(taskKey)
+ .ifMatch(ensureQuotated(objectMetadata.eTag()))
+ .range(StringUtils.format("bytes=%d-%d", start, end));
Review Comment:
For the range stuff, I think it would be useful to write an `AwsBytesRange`
utility class that takes optional start, end as inputs and exposes a method
`String getBytesRange()`. That would help avoid doing the String formatting in
multiple places and keep the code less error prone.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/ITS3ToS3ParallelIndexTest.java:
##########
@@ -40,7 +39,11 @@ public void testS3IndexData(Pair<String, List<?>>
s3InputSource) throws Exceptio
@MethodSource("resources")
public void testS3IndexData_withTempCredentials(Pair<String, List<?>>
s3InputSource) throws Exception
{
- final S3InputSourceConfig inputSourceConfig =
minIOStorageResource.createTempCredentialsForInputSource();
- doTest(s3InputSource, new Pair<>(false, false), "s3", inputSourceConfig);
+ doTestWithEndpointConfig(
Review Comment:
We should probably add a separate test method instead of updating this one.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##########
@@ -65,6 +64,42 @@ public class S3Utils
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final Logger log = new Logger(S3Utils.class);
+ /**
+ * A holder for S3Object with its associated bucket name.
+ * In AWS SDK v2, S3Object doesn't include the bucket name, so we need to
track it separately.
+ */
+ public static class S3ObjectWithBucket
Review Comment:
This should be in a separate file and not inside the `S3Utils` class.
##########
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java:
##########
@@ -616,132 +559,49 @@ public void testSeekUnassigned() throws
InterruptedException
recordSupplier.seekToEarliest(Collections.singleton(shard0));
}
-
- @Test
- public void testPollAfterSeek()
Review Comment:
Is this test not applicable anymore with v2?
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java:
##########
@@ -126,62 +127,72 @@ public String getEndpointUrl()
return getContainer().getS3URL();
}
- public AmazonS3 getS3Client()
+ public S3Client getS3Client()
{
ensureRunning();
return s3Client;
}
/**
- * Creates temporary S3 credentials using the {@link AssumeRoleRequest} that
- * can be used for S3 ingestion.
+ * Creates temporary S3 credentials using the AssumeRole STS API that can be
+ * used for S3 ingestion.
*
* @return S3InputSourceConfig with temporary credentials. The
* {@code assumeRoleArn} and {@code assumeRoleExternalId} fields are set to
null
- * since Min IO does not support them.
+ * since MinIO does not support them.
*/
public S3InputSourceConfig createTempCredentialsForInputSource()
{
ensureRunning();
- final AWSSecurityTokenService stsClient = createSTSClient();
-
- // assumeRoleArn and assumeRoleExternalId need not be specified since
MinIO ignores them
- final AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest()
- .withRoleSessionName("test-session");
+ final StsClient stsClient = createStsClient();
+
+ // SDK v2 requires a non-null roleArn. MinIO does not validate the ARN,
+ // but without an inline policy the resulting session may have no
permissions.
+ // An explicit S3 full-access policy ensures the temp credentials work.
+ final String s3FullAccessPolicy =
+
"{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Action\":[\"s3:*\"],\"Resource\":[\"*\"]}]}";
+ final AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
+ .roleArn("arn:aws:iam::000000000000:role/test-role")
+ .roleSessionName("test-session")
+ .policy(s3FullAccessPolicy)
+ .build();
- final AssumeRoleResult result = stsClient.assumeRole(assumeRoleRequest);
- final int statusCode = result.getSdkHttpMetadata().getHttpStatusCode();
- if (statusCode < 200 || statusCode >= 300) {
- throw new ISE("AssumeRole request failed with code[%s]: %s", statusCode,
result.getAssumedRoleUser());
+ final AssumeRoleResponse result = stsClient.assumeRole(assumeRoleRequest);
+ if (!result.sdkHttpResponse().isSuccessful()) {
+ throw new ISE("AssumeRole request failed with code[%s]",
result.sdkHttpResponse().statusCode());
}
- final Credentials credentials =
stsClient.assumeRole(assumeRoleRequest).getCredentials();
+ final Credentials credentials = result.credentials();
return new S3InputSourceConfig(
- new DefaultPasswordProvider(credentials.getAccessKeyId()),
- new DefaultPasswordProvider(credentials.getSecretAccessKey()),
+ new DefaultPasswordProvider(credentials.accessKeyId()),
+ new DefaultPasswordProvider(credentials.secretAccessKey()),
null,
null,
- new DefaultPasswordProvider(credentials.getSessionToken())
+ new DefaultPasswordProvider(credentials.sessionToken())
);
}
- private AmazonS3 createS3Client()
+ private S3Client createS3Client()
{
- return AmazonS3Client
- .builder()
- .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(getEndpointUrl(), "us-east-1"))
- .withCredentials(new AWSStaticCredentialsProvider(new
BasicAWSCredentials(getAccessKey(), getSecretKey())))
- .withPathStyleAccessEnabled(true)
+ return S3Client.builder()
Review Comment:
Nit: Please move `.builder()` to the next line in all such places. This
currently does not adhere to the Druid style formatting.
##########
extensions-core/druid-aws-rds-extensions/src/main/java/org/apache/druid/aws/rds/AWSRDSTokenPasswordProvider.java:
##########
@@ -98,18 +99,16 @@ public String getRegion()
public String getPassword()
{
try {
- RdsIamAuthTokenGenerator generator = RdsIamAuthTokenGenerator
- .builder()
- .credentials(awsCredentialsProvider)
- .region(region)
+ RdsUtilities rdsUtilities = RdsUtilities.builder()
Review Comment:
Nit: `.builder()` to next line to avoid unnecessary reformats later.
##########
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java:
##########
@@ -140,37 +138,34 @@ private static ByteBuffer jb(String timestamp, String
dim1, String dim2, String
throw new RuntimeException(e);
}
}
- private static AmazonKinesis kinesis;
- private static ListShardsResult listShardsResult0;
- private static ListShardsResult listShardsResult1;
- private static GetShardIteratorResult getShardIteratorResult0;
- private static GetShardIteratorResult getShardIteratorResult1;
- private static DescribeStreamResult describeStreamResult0;
- private static DescribeStreamResult describeStreamResult1;
- private static StreamDescription streamDescription0;
- private static StreamDescription streamDescription1;
- private static GetRecordsResult getRecordsResult0;
- private static GetRecordsResult getRecordsResult1;
- private static Shard shard0;
- private static Shard shard1;
- private static KinesisRecordSupplier recordSupplier;
+
+ private static Record buildV2Record(ByteBuffer data, String sequenceNumber)
+ {
+ return Record.builder()
+ .data(SdkBytes.fromByteBuffer(data.duplicate()))
+ .sequenceNumber(sequenceNumber)
+ .partitionKey("key")
+ .approximateArrivalTimestamp(Instant.now())
+ .build();
+ }
+
+ private static KinesisClientRecord toKinesisClientRecord(Record v2Record)
Review Comment:
Nit: Calling the record v2Record might be confusing since we are not using
v1 anywhere anymore (are we?).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]