jfz opened a new pull request, #5282:
URL: https://github.com/apache/iceberg/pull/5282

   This PR fixes [issue #2796](https://github.com/apache/iceberg/issues/2796). 
It avoids invalid mark reset on retries by providing the input stream factory 
instead of original input stream to S3 API.
   
   Root cause:
   
   > Currently, the underlying InputStream created from staging files is passed 
to S3 RequestBody and is reused by S3 client for every retry attempts; and S3 
`mark` the underlying input stream with hard-coded of `read limit` of 128K at 
the beginning for once and `reset` it on every retry. In the case of a first 
attempt failed after reading more than 128K, the `mark` was invalidated because 
more than `read limit` amount of data has been read, and when then second 
attempt try to `reset` the invalidated mark, it breaks the mark/reset contract 
and throws exception with "Resetting to invalid mark". See java doc of 
`InputStream` for detail of mark/reset contract.
   
   Steps to reproduce:
   
   1. Get code of [s3mock](https://github.com/adobe/S3Mock) and run server with 
below change so that it fails the first request with 503 but succeeds on 
subsequent requests.
   
   > FilestoreController.putObject:
   > \+      LOG.info(" ######## " + count.incrementAndGet());
   > \+      if(count.get() < 2) {
   > \+        return ResponseEntity.status(503).build();
   > \+      }
   >        return ResponseEntity
   >            .ok()
   >            .eTag("\"" + s3Object.getEtag() + "\"")
   > @@ -966,6 +970,7 @@ public class FileStoreController {
   >            "Error persisting object.");
   >      }
   >    }
   > \+  private static AtomicInteger count = new AtomicInteger(0);
   
   2. Update unit test `TestS3OutputStream` as below and run Update unit test 
as below and run `testInvalidMark`:
   
   > \-  @ClassRule
   > \-  public static final S3MockRule S3_MOCK_RULE = 
S3MockRule.builder().silent().build();
   > \-
   > \-  private final S3Client s3 = S3_MOCK_RULE.createS3ClientV2();
   > \+//  @ClassRule
   > \+//  public static final S3MockRule S3_MOCK_RULE = 
S3MockRule.builder().silent().build();
   > \+//
   > \+//  private final S3Client s3 = S3_MOCK_RULE.createS3ClientV2();
   > \+  private final S3Client s3 = S3Client.builder()
   > \+      
.overrideConfiguration(ClientOverrideConfiguration.builder().retryPolicy(RetryPolicy.builder().numRetries(3).build()).build())
   > \+      .region(Region.of("us\-east\-1"))
   > \+      .credentialsProvider(
   > \+              
StaticCredentialsProvider.create(AwsBasicCredentials.create("foo", "bar")))
   > \+      .endpointOverride(URI.create("http://localhost:"; \+ 9090))
   > \+      
.httpClient(UrlConnectionHttpClient.builder().buildWithDefaults(AttributeMap.builder().build()))
   > \+      .build();
   > \+  @ Test()
   > \+  public void testInvalidMark() {
   > \+    byte[] data = randomData(129 * 1024);
   > \+    writeAndVerify(s3mock, randomURI(), data, false);
   > \+    ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor =
   > \+            ArgumentCaptor.forClass(PutObjectRequest.class);
   > \+    verify(s3mock, 
times(1)).putObject(putObjectRequestArgumentCaptor.capture(),
   > \+            (RequestBody) any());
   > \+    checkPutObjectRequestContent(data, putObjectRequestArgumentCaptor);
   > \+    checkTags(putObjectRequestArgumentCaptor);
   > \+    reset(s3mock);
   > \+  }
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to