This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 30e8b3de05c [FLINK-32896] [Runtime/Coordination] Incorrect
`Map.computeIfAbsent(..., ...::new)` usage which misinterprets key as initial
capacity
30e8b3de05c is described below
commit 30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5
Author: tzy123-123 <[email protected]>
AuthorDate: Thu Oct 12 12:45:33 2023 -0400
[FLINK-32896] [Runtime/Coordination] Incorrect `Map.computeIfAbsent(...,
...::new)` usage which misinterprets key as initial capacity
[FLINK-32896] [Runtime/Coordination] Incorrect `Map.computeIfAbsent(...,
...::new)` usage which misinterprets key as initial capacity
---
docs/themes/book | 2 +-
.../partition/hybrid/HsFileDataIndexImpl.java | 2 +-
.../partition/hybrid/HsSpillingStrategy.java | 24 ++++++++++++++++------
.../partition/hybrid/HsSpillingStrategyUtils.java | 2 +-
.../file/ProducerMergedPartitionFileIndex.java | 2 +-
.../hybrid/TestingSpillingInfoProvider.java | 8 +++++---
.../SourceCoordinatorConcurrentAttemptsTest.java | 4 +++-
7 files changed, 30 insertions(+), 14 deletions(-)
diff --git a/docs/themes/book b/docs/themes/book
index a486adf8462..3f1bcccbfb2 160000
--- a/docs/themes/book
+++ b/docs/themes/book
@@ -1 +1 @@
-Subproject commit a486adf8462c0abfc9034436ddd72927d6656809
+Subproject commit 3f1bcccbfb247da44ab5410a97576c0bf6da103b
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index 25012ffa8f1..d4c0fed0a0b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -160,7 +160,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex
{
checkArgument(firstBufferInRegion.subpartitionId ==
lastBufferInRegion.subpartitionId);
checkArgument(firstBufferInRegion.bufferIndex <=
lastBufferInRegion.bufferIndex);
internalRegionsBySubpartition
- .computeIfAbsent(firstBufferInRegion.subpartitionId,
ArrayList::new)
+ .computeIfAbsent(firstBufferInRegion.subpartitionId, k -> new
ArrayList<>())
.add(
new InternalRegion(
firstBufferInRegion.bufferIndex,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index 6d3a15d427a..70a1c0b985e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -125,36 +125,48 @@ public interface HsSpillingStrategy {
private Builder() {}
public Builder addBufferToSpill(BufferIndexAndChannel buffer) {
- bufferToSpill.computeIfAbsent(buffer.getChannel(),
ArrayList::new).add(buffer);
+ bufferToSpill
+ .computeIfAbsent(buffer.getChannel(), k -> new
ArrayList<>())
+ .add(buffer);
return this;
}
public Builder addBufferToSpill(
int subpartitionId, List<BufferIndexAndChannel> buffers) {
- bufferToSpill.computeIfAbsent(subpartitionId,
ArrayList::new).addAll(buffers);
+ bufferToSpill
+ .computeIfAbsent(subpartitionId, k -> new
ArrayList<>())
+ .addAll(buffers);
return this;
}
public Builder addBufferToSpill(
int subpartitionId, Deque<BufferIndexAndChannel> buffers) {
- bufferToSpill.computeIfAbsent(subpartitionId,
ArrayList::new).addAll(buffers);
+ bufferToSpill
+ .computeIfAbsent(subpartitionId, k -> new
ArrayList<>())
+ .addAll(buffers);
return this;
}
public Builder addBufferToRelease(BufferIndexAndChannel buffer) {
- bufferToRelease.computeIfAbsent(buffer.getChannel(),
ArrayList::new).add(buffer);
+ bufferToRelease
+ .computeIfAbsent(buffer.getChannel(), k -> new
ArrayList<>())
+ .add(buffer);
return this;
}
public Builder addBufferToRelease(
int subpartitionId, List<BufferIndexAndChannel> buffers) {
- bufferToRelease.computeIfAbsent(subpartitionId,
ArrayList::new).addAll(buffers);
+ bufferToRelease
+ .computeIfAbsent(subpartitionId, k -> new
ArrayList<>())
+ .addAll(buffers);
return this;
}
public Builder addBufferToRelease(
int subpartitionId, Deque<BufferIndexAndChannel> buffers) {
- bufferToRelease.computeIfAbsent(subpartitionId,
ArrayList::new).addAll(buffers);
+ bufferToRelease
+ .computeIfAbsent(subpartitionId, k -> new
ArrayList<>())
+ .addAll(buffers);
return this;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
index c166f4a1c4b..5e3786daf0c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
@@ -73,7 +73,7 @@ public class HsSpillingStrategyUtils {
BufferConsumptionPriorityIterator
bufferConsumptionPriorityIterator = heap.poll();
BufferIndexAndChannel bufferIndexAndChannel =
bufferConsumptionPriorityIterator.next();
subpartitionToHighPriorityBuffers
- .computeIfAbsent(bufferIndexAndChannel.getChannel(),
ArrayList::new)
+ .computeIfAbsent(bufferIndexAndChannel.getChannel(), k ->
new ArrayList<>())
.add(bufferIndexAndChannel);
// if this iterator has next, re-added it.
if (bufferConsumptionPriorityIterator.hasNext()) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
index d6a08da5fb7..3046683f475 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
@@ -168,7 +168,7 @@ public class ProducerMergedPartitionFileIndex {
checkArgument(firstBufferInRegion.getBufferIndex() <=
lastBufferInRegion.getBufferIndex());
subpartitionRegionMap
- .computeIfAbsent(firstBufferInRegion.getSubpartitionId(),
ArrayList::new)
+ .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), k ->
new ArrayList<>())
.add(
new FixedSizeRegion(
firstBufferInRegion.getBufferIndex(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
index 9980f4176fa..53b9df0f55a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
@@ -198,14 +198,16 @@ public class TestingSpillingInfoProvider implements
HsSpillingInfoProvider {
public Builder addSubpartitionBuffers(
int subpartitionId, List<BufferIndexAndChannel>
subpartitionBuffers) {
- allBuffers.computeIfAbsent(subpartitionId,
ArrayList::new).addAll(subpartitionBuffers);
+ allBuffers
+ .computeIfAbsent(subpartitionId, k -> new ArrayList<>())
+ .addAll(subpartitionBuffers);
return this;
}
public Builder addSpillBuffers(
int subpartitionId, List<Integer>
subpartitionSpillBufferIndexes) {
spillBufferIndexes
- .computeIfAbsent(subpartitionId, HashSet::new)
+ .computeIfAbsent(subpartitionId, k -> new HashSet<>())
.addAll(subpartitionSpillBufferIndexes);
return this;
}
@@ -213,7 +215,7 @@ public class TestingSpillingInfoProvider implements
HsSpillingInfoProvider {
public Builder addConsumedBuffers(
int subpartitionId, List<Integer>
subpartitionConsumedBufferIndexes) {
consumedBufferIndexes
- .computeIfAbsent(subpartitionId, HashSet::new)
+ .computeIfAbsent(subpartitionId, k -> new HashSet<>())
.addAll(subpartitionConsumedBufferIndexes);
return this;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
index 65a82027c13..3f3d0a1f63e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
@@ -266,7 +266,9 @@ class SourceCoordinatorConcurrentAttemptsTest extends
SourceCoordinatorTestBase
@Override
public void handleSourceEvent(int subtaskId, int attemptNumber,
SourceEvent sourceEvent) {
- sourceEvents.computeIfAbsent(subtaskId,
HashMap::new).put(attemptNumber, sourceEvent);
+ sourceEvents
+ .computeIfAbsent(subtaskId, k -> new HashMap<>())
+ .put(attemptNumber, sourceEvent);
handleSourceEvent(subtaskId, sourceEvent);
}