This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch 26.0.0 in repository https://gitbox.apache.org/repos/asf/druid.git
commit 822e18040d3c65e56d53fcf956afecbe3132156e Author: Laksh Singla <[email protected]> AuthorDate: Tue Apr 18 14:25:20 2023 +0530 Remove maxResultsSize config property from S3OutputConfig (#14101) * "maxResultsSize" has been removed from the S3OutputConfig and a default "chunkSize" of 100MiB is now present. This change primarily affects users who wish to use durable storage for MSQ jobs. (cherry picked from commit 8eb854c845f8eb2d272e3f2c6046e5a146118aa1) --- docs/multi-stage-query/reference.md | 8 +- .../storage/s3/output/RetryableS3OutputStream.java | 4 - .../druid/storage/s3/output/S3OutputConfig.java | 61 ++------------- .../s3/output/S3StorageConnectorProvider.java | 3 +- .../s3/output/RetryableS3OutputStreamTest.java | 44 ----------- .../storage/s3/output/S3OutputConfigTest.java | 86 ---------------------- .../druid/storage/s3/output/S3OutputSerdeTest.java | 7 -- .../storage/s3/output/S3StorageConnectorTest.java | 1 - website/.spelling | 1 + 9 files changed, 13 insertions(+), 202 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index de75eef071..d2f8db55c1 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -365,12 +365,12 @@ The following common service properties control how durable storage behaves: |Parameter |Default | Description | |-------------------|----------------------------------------|----------------------| |`druid.msq.intermediate.storage.bucket` | n/a | The bucket in S3 where you want to store intermediate files. | -| `druid.msq.intermediate.storage.chunkSize` | n/a | Optional. Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. Druid computes the chunk size automatically if no value is provided.| +|`druid.msq.intermediate.storage.chunkSize` | 100MiB | Optional. Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| |`druid.msq.intermediate.storage.enable` | true | Required. Whether to enable durable storage for the cluster.| -| `druid.msq.intermediate.storage.maxTriesOnTransientErrors` | 10 | Optional. Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | -|`druid.msq.intermediate.storage.type` | `s3` if your deep storage is S3 | Required. The type of storage to use. You can either set this to `local` or `s3`. | +|`druid.msq.intermediate.storage.maxRetry` | 10 | Optional. Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | |`druid.msq.intermediate.storage.prefix` | n/a | S3 prefix to store intermediate stage results. Provide a unique value for the prefix. Don't share the same prefix between clusters. If the location includes other files or directories, then they will get cleaned up as well. | -| `druid.msq.intermediate.storage.tempDir`| | Required. Directory path on the local disk to temporarily store intermediate stage results. | +|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to temporarily store intermediate stage results. | +|`druid.msq.intermediate.storage.type` | `s3` if your deep storage is S3 | Required. The type of storage to use. You can either set this to `local` or `s3`. | In addition to the common service properties, there are certain properties that you configure on the Overlord specifically to clean up intermediate files: diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java index ba3d461bee..82f5cd812c 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java @@ -33,7 +33,6 @@ import com.google.common.base.Stopwatch; import com.google.common.io.CountingOutputStream; import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; @@ -203,9 +202,6 @@ public class RetryableS3OutputStream extends OutputStream try { if (chunk.length() > 0) { resultsSize += chunk.length(); - if (resultsSize > config.getMaxResultsSize()) { - throw new IOE("Exceeded max results size [%s]", config.getMaxResultsSize()); - } pushStopwatch.start(); pushResults.add(push(chunk)); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java index 922b422a33..35e228f7ef 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java @@ -34,9 +34,6 @@ public class S3OutputConfig { public static final long S3_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES = 5L * 1024 * 1024; public static final long S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES = 5L * 1024 * 1024 * 1024L; - private static final int S3_MULTIPART_UPLOAD_MAX_NUM_PARTS = 10_000; - public static final long S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES = 5L * 1024 * 1024; - public static final long S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES = 5L * 1024 * 1024 * 1024 * 1024; @JsonProperty private String bucket; @@ -48,10 +45,7 @@ public class S3OutputConfig @Nullable @JsonProperty - private HumanReadableBytes chunkSize; - - @JsonProperty - private HumanReadableBytes maxResultsSize = new HumanReadableBytes("100MiB"); + private HumanReadableBytes chunkSize = new HumanReadableBytes("100MiB"); /** * Max number of tries for each upload. @@ -65,11 +59,10 @@ public class S3OutputConfig @JsonProperty(value = "prefix", required = true) String prefix, @JsonProperty(value = "tempDir", required = true) File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, - @JsonProperty("maxResultsSize") HumanReadableBytes maxResultsSize, @JsonProperty("maxRetry") Integer maxRetry ) { - this(bucket, prefix, tempDir, chunkSize, maxResultsSize, maxRetry, true); + this(bucket, prefix, tempDir, chunkSize, maxRetry, true); } @VisibleForTesting @@ -80,8 +73,6 @@ public class S3OutputConfig @Nullable HumanReadableBytes chunkSize, @Nullable - HumanReadableBytes maxResultsSize, - @Nullable Integer maxRetry, boolean validation ) @@ -92,9 +83,6 @@ public class S3OutputConfig if (chunkSize != null) { this.chunkSize = chunkSize; } - if (maxResultsSize != null) { - this.maxResultsSize = maxResultsSize; - } if (maxRetry != null) { this.maxRetry = maxRetry; } @@ -116,21 +104,9 @@ public class S3OutputConfig ); } - // check result size which relies on the s3 multipart upload limits. - // See https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html for more details. - if (maxResultsSize.getBytes() < S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES - || maxResultsSize.getBytes() > S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES) { - throw new IAE( - "maxResultsSize[%d] should be >= [%d] and <= [%d] bytes", - maxResultsSize.getBytes(), - S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES, - S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES - ); - } - //check results size and chunk size are compatible. if (chunkSize != null) { - validateChunkSize(maxResultsSize.getBytes(), chunkSize.getBytes()); + validateChunkSize(chunkSize.getBytes()); } } @@ -151,12 +127,7 @@ public class S3OutputConfig public Long getChunkSize() { - return chunkSize == null ? computeMinChunkSize(getMaxResultsSize()) : chunkSize.getBytes(); - } - - public long getMaxResultsSize() - { - return maxResultsSize.getBytes(); + return chunkSize.getBytes(); } public int getMaxRetry() @@ -164,25 +135,8 @@ public class S3OutputConfig return maxRetry; } - - public static long computeMinChunkSize(long maxResultsSize) - { - return Math.max( - (long) Math.ceil(maxResultsSize / (double) S3_MULTIPART_UPLOAD_MAX_NUM_PARTS), - S3_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES - ); - } - - private static void validateChunkSize(long maxResultsSize, long chunkSize) + private static void validateChunkSize(long chunkSize) { - if (S3OutputConfig.computeMinChunkSize(maxResultsSize) > chunkSize) { - throw new IAE( - "chunkSize[%d] is too small for maxResultsSize[%d]. chunkSize should be at least [%d]", - chunkSize, - maxResultsSize, - S3OutputConfig.computeMinChunkSize(maxResultsSize) - ); - } if (S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES < chunkSize) { throw new IAE( "chunkSize[%d] should be smaller than [%d]", @@ -206,14 +160,13 @@ public class S3OutputConfig && bucket.equals(that.bucket) && prefix.equals(that.prefix) && tempDir.equals(that.tempDir) - && Objects.equals(chunkSize, that.chunkSize) - && maxResultsSize.equals(that.maxResultsSize); + && Objects.equals(chunkSize, that.chunkSize); } @Override public int hashCode() { - return Objects.hash(bucket, prefix, tempDir, chunkSize, maxResultsSize, maxRetry); + return Objects.hash(bucket, prefix, tempDir, chunkSize, maxRetry); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java index 14eeeb192a..7f4b43a0ed 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java @@ -44,11 +44,10 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag @JsonProperty(value = "prefix", required = true) String prefix, @JsonProperty(value = "tempDir", required = true) File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, - @JsonProperty("maxResultsSize") HumanReadableBytes maxResultsSize, @JsonProperty("maxRetry") Integer maxRetry ) { - super(bucket, prefix, tempDir, chunkSize, maxResultsSize, maxRetry); + super(bucket, prefix, tempDir, chunkSize, maxRetry); } @Override diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index f9068b2a1d..1f8eac3bba 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -65,7 +65,6 @@ public class RetryableS3OutputStreamTest private S3OutputConfig config; - private long maxResultsSize; private long chunkSize; @Before @@ -78,7 +77,6 @@ public class RetryableS3OutputStreamTest "TEST", tempDir, HumanReadableBytes.valueOf(chunkSize), - HumanReadableBytes.valueOf(maxResultsSize), 2, false ) @@ -95,12 +93,6 @@ public class RetryableS3OutputStreamTest return chunkSize; } - @Override - public long getMaxResultsSize() - { - return maxResultsSize; - } - @Override public int getMaxRetry() { @@ -112,7 +104,6 @@ public class RetryableS3OutputStreamTest @Test public void testWriteAndHappy() throws IOException { - maxResultsSize = 1000; chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); try (RetryableS3OutputStream out = new RetryableS3OutputStream( @@ -135,7 +126,6 @@ public class RetryableS3OutputStreamTest @Test public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws IOException { - maxResultsSize = 1000; chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES * 3); try (RetryableS3OutputStream out = new RetryableS3OutputStream( @@ -158,7 +148,6 @@ public class RetryableS3OutputStreamTest @Test public void testWriteSmallBufferShouldSucceed() throws IOException { - maxResultsSize = 1000; chunkSize = 128; try (RetryableS3OutputStream out = new RetryableS3OutputStream( config, @@ -175,43 +164,11 @@ public class RetryableS3OutputStreamTest s3.assertCompleted(chunkSize, 600); } - @Test - public void testHitResultsSizeLimit() throws IOException - { - maxResultsSize = 50; - ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); - try (RetryableS3OutputStream out = new RetryableS3OutputStream( - config, - s3, - path, - false - )) { - for (int i = 0; i < 14; i++) { - bb.clear(); - bb.putInt(i); - out.write(bb.array()); - } - - Assert.assertThrows( - "Exceeded max results size [50]", - IOException.class, - () -> { - bb.clear(); - bb.putInt(14); - out.write(bb.array()); - } - ); - } - - s3.assertCancelled(); - } - @Test public void testSuccessToUploadAfterRetry() throws IOException { final TestAmazonS3 s3 = new TestAmazonS3(1); - maxResultsSize = 1000; chunkSize = 10; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); try (RetryableS3OutputStream out = new RetryableS3OutputStream( @@ -236,7 +193,6 @@ public class RetryableS3OutputStreamTest { final TestAmazonS3 s3 = new TestAmazonS3(3); - maxResultsSize = 1000; ByteBuffer bb = ByteBuffer.allocate(Integer.BYTES); try (RetryableS3OutputStream out = new RetryableS3OutputStream( config, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputConfigTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputConfigTest.java index 3e4c864ee3..94d879a2af 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputConfigTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputConfigTest.java @@ -20,7 +20,6 @@ package org.apache.druid.storage.s3.output; import org.apache.druid.java.util.common.HumanReadableBytes; -import org.apache.druid.java.util.common.IAE; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -38,53 +37,9 @@ public class S3OutputConfigTest private static String PREFIX = "PREFIX"; private static int MAX_RETRY_COUNT = 0; - - @Test - public void testTooSmallChunkSize() throws IOException - { - long maxResultsSize = 100_000_000_000L; - long chunkSize = 9000_000L; - - expectedException.expect(IAE.class); - expectedException.expectMessage( - "chunkSize[9000000] is too small for maxResultsSize[100000000000]. chunkSize should be at least [10000000]" - ); - new S3OutputConfig( - BUCKET, - PREFIX, - temporaryFolder.newFolder(), - HumanReadableBytes.valueOf(chunkSize), - HumanReadableBytes.valueOf(maxResultsSize), - MAX_RETRY_COUNT, - true - ); - } - - @Test - public void testTooSmallChunkSizeMaxResultsSizeIsNotRetionalToMaxPartNum() throws IOException - { - long maxResultsSize = 274_877_906_944L; - long chunkSize = 2_7487_790; - - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "chunkSize[27487790] is too small for maxResultsSize[274877906944]. chunkSize should be at least [27487791]" - ); - new S3OutputConfig( - BUCKET, - PREFIX, - temporaryFolder.newFolder(), - HumanReadableBytes.valueOf(chunkSize), - HumanReadableBytes.valueOf(maxResultsSize), - MAX_RETRY_COUNT, - true - ); - } - @Test public void testTooLargeChunkSize() throws IOException { - long maxResultsSize = 1024L * 1024 * 1024 * 1024; long chunkSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES + 1; expectedException.expect(IllegalArgumentException.class); @@ -96,49 +51,8 @@ public class S3OutputConfigTest PREFIX, temporaryFolder.newFolder(), HumanReadableBytes.valueOf(chunkSize), - HumanReadableBytes.valueOf(maxResultsSize), - MAX_RETRY_COUNT, - true - ); - } - - @Test - public void testResultsTooLarge() throws IOException - { - long maxResultsSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_OBJECT_SIZE_BYTES + 1; - - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "maxResultsSize[5497558138881] should be >= " - ); - new S3OutputConfig( - BUCKET, - PREFIX, - temporaryFolder.newFolder(), - null, - HumanReadableBytes.valueOf(maxResultsSize), MAX_RETRY_COUNT, true ); } - - @Test - public void testResultsTooSmall() throws IOException - { - long maxResultsSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MIN_OBJECT_SIZE_BYTES - 1; - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "maxResultsSize[5242879] should be >= " - ); - new S3OutputConfig( - BUCKET, - PREFIX, - temporaryFolder.newFolder(), - null, - HumanReadableBytes.valueOf(maxResultsSize), - MAX_RETRY_COUNT, - true - ); - } - } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java index c9465d1a36..72ea888615 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java @@ -50,7 +50,6 @@ public class S3OutputSerdeTest + " \"prefix\": \"abc\",\n" + " \"tempDir\": \"/tmp\",\n" + " \"chunkSize\":104857600,\n" - + " \"maxResultsSize\":209715200,\n" + " \"maxRetry\": 2\n" + "}\n"); @@ -59,7 +58,6 @@ public class S3OutputSerdeTest "abc", new File("/tmp"), HumanReadableBytes.valueOf(HumanReadableBytes.parse("100Mib")), - HumanReadableBytes.valueOf(HumanReadableBytes.parse("200Mib")), 2 ); @@ -78,7 +76,6 @@ public class S3OutputSerdeTest + " \"bucket\": \"TEST\",\n" + " \"tempDir\": \"/tmp\",\n" + " \"chunkSize\":104857600,\n" - + " \"maxResultsSize\":209715200,\n" + " \"maxRetry\": 2\n" + "}\n"); expectedException.expect(MismatchedInputException.class); @@ -93,7 +90,6 @@ public class S3OutputSerdeTest + " \"prefix\": \"abc\",\n" + " \"tempDir\": \"/tmp\",\n" + " \"chunkSize\":104857600,\n" - + " \"maxResultsSize\":209715200,\n" + " \"maxRetry\": 2\n" + "}\n"); expectedException.expect(MismatchedInputException.class); @@ -108,7 +104,6 @@ public class S3OutputSerdeTest + " \"prefix\": \"abc\",\n" + " \"bucket\": \"TEST\",\n" + " \"chunkSize\":104857600,\n" - + " \"maxResultsSize\":209715200,\n" + " \"maxRetry\": 2\n" + "}\n"); expectedException.expect(MismatchedInputException.class); @@ -130,7 +125,6 @@ public class S3OutputSerdeTest "abc", new File("/tmp"), null, - null, null ); Assert.assertEquals(s3OutputConfig, MAPPER.readValue(json, S3OutputConfig.class)); @@ -146,7 +140,6 @@ public class S3OutputSerdeTest + " \"bucket\": \"TEST\",\n" + " \"tempDir\": \"/tmp\",\n" + " \"chunkSize\":104,\n" - + " \"maxResultsSize\":209715200,\n" + " \"maxRetry\": 2\n" + "}\n"); expectedException.expect(ValueInstantiationException.class); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java index 6deda73e8e..a1d4250811 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java @@ -80,7 +80,6 @@ public class S3StorageConnectorTest temporaryFolder.newFolder(), null, null, - null, true ), SERVICE); } diff --git a/website/.spelling b/website/.spelling index bf846ba207..e31d8c4565 100644 --- a/website/.spelling +++ b/website/.spelling @@ -19,6 +19,7 @@ # one word per line, to define a file override use ' - filename' # where filename is relative to this configuration file 1M +100MiB 32-bit 500MiB 64-bit --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
