Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5105#discussion_r156923552
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
---
@@ -40,37 +39,31 @@
private final BufferRecycler bufferRecycler;
- private AtomicInteger numberOfCreatedBuffers = new AtomicInteger();
+ private final int poolSize;
- public TestBufferFactory() {
- this(BUFFER_SIZE, RECYCLER);
- }
-
- public TestBufferFactory(int bufferSize) {
- this(bufferSize, RECYCLER);
- }
+ private int numberOfCreatedBuffers = 0;
- public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler)
{
+ public TestBufferFactory(int poolSize, int bufferSize, BufferRecycler
bufferRecycler) {
checkArgument(bufferSize > 0);
+ this.poolSize = poolSize;
this.bufferSize = bufferSize;
this.bufferRecycler = checkNotNull(bufferRecycler);
}
- public Buffer create() {
- numberOfCreatedBuffers.incrementAndGet();
+ public synchronized Buffer create() {
+ if (numberOfCreatedBuffers >= poolSize) {
+ return null;
+ }
+ numberOfCreatedBuffers++;
return new
Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
bufferRecycler);
}
- public Buffer createFrom(MemorySegment segment) {
- return new Buffer(segment, bufferRecycler);
- }
-
- public int getNumberOfCreatedBuffers() {
- return numberOfCreatedBuffers.get();
+ public synchronized int getNumberOfCreatedBuffers() {
+ return numberOfCreatedBuffers;
}
- public int getBufferSize() {
+ public synchronized int getBufferSize() {
--- End diff --
I would leave it just for the sake of having all methods `synchronized` so
that you don't have to think which one are and which one should be
`synchronized` (when adding features or refactoring this class in the future)
---