jfz opened a new pull request, #5282: URL: https://github.com/apache/iceberg/pull/5282
This PR fixes [issue #2796](https://github.com/apache/iceberg/issues/2796). It avoids invalid mark reset on retries by providing the input stream factory instead of original input stream to S3 API. Root cause: > Currently, the underlying InputStream created from staging files is passed to S3 RequestBody and is reused by S3 client for every retry attempts; and S3 `mark` the underlying input stream with hard-coded of `read limit` of 128K at the beginning for once and `reset` it on every retry. In the case of a first attempt failed after reading more than 128K, the `mark` was invalidated because more than `read limit` amount of data has been read, and when then second attempt try to `reset` the invalidated mark, it breaks the mark/reset contract and throws exception with "Resetting to invalid mark". See java doc of `InputStream` for detail of mark/reset contract. Steps to reproduce: 1. Get code of [s3mock](https://github.com/adobe/S3Mock) and run server with below change so that it fails the first request with 503 but succeeds on subsequent requests. > FilestoreController.putObject: > \+ LOG.info(" ######## " + count.incrementAndGet()); > \+ if(count.get() < 2) { > \+ return ResponseEntity.status(503).build(); > \+ } > return ResponseEntity > .ok() > .eTag("\"" + s3Object.getEtag() + "\"") > @@ -966,6 +970,7 @@ public class FileStoreController { > "Error persisting object."); > } > } > \+ private static AtomicInteger count = new AtomicInteger(0); 2. Update unit test `TestS3OutputStream` as below and run Update unit test as below and run `testInvalidMark`: > \- @ClassRule > \- public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder().silent().build(); > \- > \- private final S3Client s3 = S3_MOCK_RULE.createS3ClientV2(); > \+// @ClassRule > \+// public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder().silent().build(); > \+// > \+// private final S3Client s3 = S3_MOCK_RULE.createS3ClientV2(); > \+ private final S3Client s3 = S3Client.builder() > \+ .overrideConfiguration(ClientOverrideConfiguration.builder().retryPolicy(RetryPolicy.builder().numRetries(3).build()).build()) > \+ .region(Region.of("us\-east\-1")) > \+ .credentialsProvider( > \+ StaticCredentialsProvider.create(AwsBasicCredentials.create("foo", "bar"))) > \+ .endpointOverride(URI.create("http://localhost:" \+ 9090)) > \+ .httpClient(UrlConnectionHttpClient.builder().buildWithDefaults(AttributeMap.builder().build())) > \+ .build(); > \+ @ Test() > \+ public void testInvalidMark() { > \+ byte[] data = randomData(129 * 1024); > \+ writeAndVerify(s3mock, randomURI(), data, false); > \+ ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = > \+ ArgumentCaptor.forClass(PutObjectRequest.class); > \+ verify(s3mock, times(1)).putObject(putObjectRequestArgumentCaptor.capture(), > \+ (RequestBody) any()); > \+ checkPutObjectRequestContent(data, putObjectRequestArgumentCaptor); > \+ checkTags(putObjectRequestArgumentCaptor); > \+ reset(s3mock); > \+ } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
