singhpk234 commented on code in PR #4912:
URL: https://github.com/apache/iceberg/pull/4912#discussion_r885873002
##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -88,23 +97,68 @@ public void seek(long newPos) {
@Override
public int read() throws IOException {
- Preconditions.checkState(!closed, "Cannot read: already closed");
- positionStream();
+ AtomicInteger byteRef = new AtomicInteger(0);
Review Comment:
[question] do we need AtomicInt here, I think the requirement for this
arises, here since we are using it in lambda provided in Task below, we can
also use `final int byteRef[] = {0}` , does it seems reasonable ? Considering
it's single threaded access.
##########
aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java:
##########
@@ -353,6 +353,30 @@ public class AwsProperties implements Serializable {
@Deprecated
public static final boolean CLIENT_ENABLE_ETAG_CHECK_DEFAULT = false;
+ /**
+ * Number of times to retry S3 read operation.
+ */
+ public static final String S3_READ_RETRY_NUM_RETRIES =
"s3.read.retry.num-retries";
+ public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6;
Review Comment:
+1, would be really nice to know
I think they are diff S3A consts : [Code
Pointer](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java#L122-L129)
1. retry limit - 7
2. retry interval - 500ms
##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
}
private void openStream() throws IOException {
- GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
- .bucket(location.bucket())
- .key(location.key())
- .range(String.format("bytes=%s-", pos));
-
- S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
closeStream();
- stream = s3.getObject(requestBuilder.build(),
ResponseTransformer.toInputStream());
+ stream = readRange(String.format("bytes=%s-", pos));
}
private void closeStream() throws IOException {
Review Comment:
[minor] Now since we are catching the IOException in `closeServerSideStream`
closeStream actually doesn't throw IOException we can remove it from signature
of closeStream / openStream then
##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java:
##########
@@ -178,25 +281,56 @@ private void positionStream() throws IOException {
}
private void openStream() throws IOException {
- GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
- .bucket(location.bucket())
- .key(location.key())
- .range(String.format("bytes=%s-", pos));
-
- S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
-
closeStream();
- stream = s3.getObject(requestBuilder.build(),
ResponseTransformer.toInputStream());
+ stream = readRange(String.format("bytes=%s-", pos));
}
private void closeStream() throws IOException {
- if (stream != null) {
- stream.close();
+ closeServerSideStream(stream);
+ stream = null;
+ }
+
+ private static void closeServerSideStream(InputStream streamToClose) {
+ if (streamToClose != null) {
+ try {
+ if (streamToClose instanceof ResponseInputStream) {
+ // Stated in the ResponseInputStream javadoc:
+ // If it is not desired to read remaining data from the stream,
+ // you can explicitly abort the connection via abort().
+ ((ResponseInputStream<?>) streamToClose).abort();
+ } else {
+ streamToClose.close();
+ }
+ } catch (IOException | AbortedException e) {
+ // ignore failure to abort or close stream
+ }
}
}
- public void setSkipSize(int skipSize) {
- this.skipSize = skipSize;
+ private static boolean shouldRetry(Exception exception) {
+ if (exception instanceof UncheckedIOException) {
+ if (exception.getCause() instanceof EOFException) {
+ return false;
+ }
+ }
+
+ if (exception instanceof AwsServiceException) {
+ switch (((AwsServiceException) exception).statusCode()) {
+ case HttpURLConnection.HTTP_FORBIDDEN:
+ case HttpURLConnection.HTTP_BAD_REQUEST:
+ return false;
+ }
+ }
+
+ if (exception instanceof S3Exception) {
+ switch (((S3Exception) exception).statusCode()) {
+ case HttpURLConnection.HTTP_NOT_FOUND:
+ case 416: // range not satisfied
+ return false;
+ }
+ }
+
+ return true;
Review Comment:
Should we add this in a seperate util method ? Considering it can be
extended to all s3 interactions
Also any pointers if we know this is the complete list considering API's we
use to connect to S3, For ex : (sample list :
[S3A](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java#L177-L231))
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]