[ https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513605#comment-16513605 ]
ASF GitHub Bot commented on FLINK-9560: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6149#discussion_r195681277 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java --- @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception { } } + @Test + public void testLimitingRateLimitingStream() throws Exception { + final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem( + LocalFileSystem.getSharedInstance(), + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + 0, + 0, + 10000, // Limit write to 10 kbytes/s + 10000); // Limit read to 10 kbytes/s + File file = tempFolder.newFile(); + Path path = new Path(file.toURI()); + long durationWrite = System.currentTimeMillis(); + try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) { + final Random rnd = new Random(); + final byte[] data = new byte[100]; + for (int i = 0; i < (1000 + 10); i++) { + rnd.nextBytes(data); + stream.write(data); + } + } + durationWrite = System.currentTimeMillis() - durationWrite; + + long durationRead = System.currentTimeMillis(); + final byte[] data = new byte[100]; + try (FSDataInputStream stream = limitedFs.open(path)) { + //noinspection StatementWithEmptyBody + while (stream.read(data) != -1) {} + } + durationRead = System.currentTimeMillis() - durationRead; + file.delete(); + assertTrue(durationWrite > 10000); + assertTrue(durationRead > 8000); // Less stability with read limiter than write --- End diff -- I have debugged it, and my guess is that this instability came from bursty reads (`org.apache.flink.shaded.guava18.com.google.common.util.concurrent.SmoothRateLimiter.SmoothBursty`). Default `RateLimiter` comes with 1 second of max burst duration and this "burst credits" are accumulated for reads while this test is writing. I have modified this test to take that into account (and added test case for single byte writes/reads), please check the code below: ``` @Test public void testLimitingRateLimitingStream() throws Exception { testLimitingRateLimitingStream(10); } @Test public void testLimitingRateLimitingStreamSingeByteWrites() throws Exception { testLimitingRateLimitingStream(1); } private void testLimitingRateLimitingStream(int chunkSize) throws Exception { long dataRate = 1000; // Limit read to 1000 bytes/s // Allowed burst duration in SmoothBursty when creating RateLimiter. This value is copied from `RateLimiter#create(SleepingStopwatch, double)` long burstDurationSeconds = 1; long burstData = dataRate * burstDurationSeconds; long dataToWrite = 2000; assertTrue( "Becauese of burst writes/reads, dataToWrite must be larger then burstData", dataToWrite > burstData); final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem( LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, 0, 0, dataRate, dataRate); File file = tempFolder.newFile(); Path path = new Path(file.toURI()); long durationWrite = System.currentTimeMillis(); try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) { final byte[] data = new byte[chunkSize]; for (int i = 0; i < dataToWrite; i += chunkSize) { if (chunkSize == 1) { stream.write(42); } else { stream.write(data); } } } durationWrite = System.currentTimeMillis() - durationWrite; long durationRead = System.currentTimeMillis(); try (FSDataInputStream stream = limitedFs.open(path)) { final byte[] data = new byte[chunkSize]; //noinspection StatementWithEmptyBody int result = 0; while (result != -1) { result = chunkSize == 1 ? stream.read() : stream.read(data); } } durationRead = System.currentTimeMillis() - durationRead; file.delete(); long expectedDuration = (long) (((dataToWrite - burstData) * 1000) / dataRate * (0.9)); assertThat(durationWrite, greaterThan(expectedDuration)); assertThat(durationRead, greaterThan(expectedDuration)); } ``` > RateLimiting for FileSystem > --------------------------- > > Key: FLINK-9560 > URL: https://issues.apache.org/jira/browse/FLINK-9560 > Project: Flink > Issue Type: Improvement > Components: FileSystem > Affects Versions: 1.5.0 > Reporter: Etienne CARRIERE > Priority: Major > > *Pain*: On our system, we see that during checkpoint , all the bandwidth is > take to send the checkpoint to object storage (s3 in our case) > *Proposal* : After the creation of some limitation on Filesystem (mostly > number of connections with the tickets FLINK-8125/FLINK-8198/FLINK-9468), I > propose to add ratelimiting "per Filesystem" . > *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a > ratelimiter on both Input and OutputStream. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)