Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6149#discussion_r195681277
--- Diff:
flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
---
@@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws
Exception {
}
}
+ @Test
+ public void testLimitingRateLimitingStream() throws Exception {
+ final LimitedConnectionsFileSystem limitedFs = new
LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ 0,
+ 0,
+ 10000, // Limit write to 10 kbytes/s
+ 10000); // Limit read to 10 kbytes/s
+ File file = tempFolder.newFile();
+ Path path = new Path(file.toURI());
+ long durationWrite = System.currentTimeMillis();
+ try (FSDataOutputStream stream = limitedFs.create(path,
WriteMode.OVERWRITE)) {
+ final Random rnd = new Random();
+ final byte[] data = new byte[100];
+ for (int i = 0; i < (1000 + 10); i++) {
+ rnd.nextBytes(data);
+ stream.write(data);
+ }
+ }
+ durationWrite = System.currentTimeMillis() - durationWrite;
+
+ long durationRead = System.currentTimeMillis();
+ final byte[] data = new byte[100];
+ try (FSDataInputStream stream = limitedFs.open(path)) {
+ //noinspection StatementWithEmptyBody
+ while (stream.read(data) != -1) {}
+ }
+ durationRead = System.currentTimeMillis() - durationRead;
+ file.delete();
+ assertTrue(durationWrite > 10000);
+ assertTrue(durationRead > 8000); // Less stability with read
limiter than write
--- End diff --
I have debugged it, and my guess is that this instability came from bursty
reads
(`org.apache.flink.shaded.guava18.com.google.common.util.concurrent.SmoothRateLimiter.SmoothBursty`).
Default `RateLimiter` comes with 1 second of max burst duration and this
"burst credits" are accumulated for reads while this test is writing. I have
modified this test to take that into account (and added test case for single
byte writes/reads), please check the code below:
```
@Test
public void testLimitingRateLimitingStream() throws Exception {
testLimitingRateLimitingStream(10);
}
@Test
public void testLimitingRateLimitingStreamSingeByteWrites() throws
Exception {
testLimitingRateLimitingStream(1);
}
private void testLimitingRateLimitingStream(int chunkSize) throws
Exception {
long dataRate = 1000; // Limit read to 1000 bytes/s
// Allowed burst duration in SmoothBursty when creating
RateLimiter. This value is copied from `RateLimiter#create(SleepingStopwatch,
double)`
long burstDurationSeconds = 1;
long burstData = dataRate * burstDurationSeconds;
long dataToWrite = 2000;
assertTrue(
"Becauese of burst writes/reads, dataToWrite must be
larger then burstData",
dataToWrite > burstData);
final LimitedConnectionsFileSystem limitedFs = new
LimitedConnectionsFileSystem(
LocalFileSystem.getSharedInstance(),
Integer.MAX_VALUE,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
0,
0,
dataRate,
dataRate);
File file = tempFolder.newFile();
Path path = new Path(file.toURI());
long durationWrite = System.currentTimeMillis();
try (FSDataOutputStream stream = limitedFs.create(path,
WriteMode.OVERWRITE)) {
final byte[] data = new byte[chunkSize];
for (int i = 0; i < dataToWrite; i += chunkSize) {
if (chunkSize == 1) {
stream.write(42);
}
else {
stream.write(data);
}
}
}
durationWrite = System.currentTimeMillis() - durationWrite;
long durationRead = System.currentTimeMillis();
try (FSDataInputStream stream = limitedFs.open(path)) {
final byte[] data = new byte[chunkSize];
//noinspection StatementWithEmptyBody
int result = 0;
while (result != -1) {
result = chunkSize == 1 ? stream.read() :
stream.read(data);
}
}
durationRead = System.currentTimeMillis() - durationRead;
file.delete();
long expectedDuration = (long) (((dataToWrite - burstData) *
1000) / dataRate * (0.9));
assertThat(durationWrite, greaterThan(expectedDuration));
assertThat(durationRead, greaterThan(expectedDuration));
}
```
---