This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 30e8b3de05c [FLINK-32896] [Runtime/Coordination] Incorrect 
`Map.computeIfAbsent(..., ...::new)` usage which misinterprets key as initial 
capacity
30e8b3de05c is described below

commit 30e8b3de05c1d6b75d8f27b9188a1d34f1589ac5
Author: tzy123-123 <[email protected]>
AuthorDate: Thu Oct 12 12:45:33 2023 -0400

    [FLINK-32896] [Runtime/Coordination] Incorrect `Map.computeIfAbsent(..., 
...::new)` usage which misinterprets key as initial capacity
    
    [FLINK-32896] [Runtime/Coordination] Incorrect `Map.computeIfAbsent(..., 
...::new)` usage which misinterprets key as initial capacity
---
 docs/themes/book                                   |  2 +-
 .../partition/hybrid/HsFileDataIndexImpl.java      |  2 +-
 .../partition/hybrid/HsSpillingStrategy.java       | 24 ++++++++++++++++------
 .../partition/hybrid/HsSpillingStrategyUtils.java  |  2 +-
 .../file/ProducerMergedPartitionFileIndex.java     |  2 +-
 .../hybrid/TestingSpillingInfoProvider.java        |  8 +++++---
 .../SourceCoordinatorConcurrentAttemptsTest.java   |  4 +++-
 7 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/docs/themes/book b/docs/themes/book
index a486adf8462..3f1bcccbfb2 160000
--- a/docs/themes/book
+++ b/docs/themes/book
@@ -1 +1 @@
-Subproject commit a486adf8462c0abfc9034436ddd72927d6656809
+Subproject commit 3f1bcccbfb247da44ab5410a97576c0bf6da103b
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index 25012ffa8f1..d4c0fed0a0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -160,7 +160,7 @@ public class HsFileDataIndexImpl implements HsFileDataIndex 
{
         checkArgument(firstBufferInRegion.subpartitionId == 
lastBufferInRegion.subpartitionId);
         checkArgument(firstBufferInRegion.bufferIndex <= 
lastBufferInRegion.bufferIndex);
         internalRegionsBySubpartition
-                .computeIfAbsent(firstBufferInRegion.subpartitionId, 
ArrayList::new)
+                .computeIfAbsent(firstBufferInRegion.subpartitionId, k -> new 
ArrayList<>())
                 .add(
                         new InternalRegion(
                                 firstBufferInRegion.bufferIndex,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index 6d3a15d427a..70a1c0b985e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -125,36 +125,48 @@ public interface HsSpillingStrategy {
             private Builder() {}
 
             public Builder addBufferToSpill(BufferIndexAndChannel buffer) {
-                bufferToSpill.computeIfAbsent(buffer.getChannel(), 
ArrayList::new).add(buffer);
+                bufferToSpill
+                        .computeIfAbsent(buffer.getChannel(), k -> new 
ArrayList<>())
+                        .add(buffer);
                 return this;
             }
 
             public Builder addBufferToSpill(
                     int subpartitionId, List<BufferIndexAndChannel> buffers) {
-                bufferToSpill.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
+                bufferToSpill
+                        .computeIfAbsent(subpartitionId, k -> new 
ArrayList<>())
+                        .addAll(buffers);
                 return this;
             }
 
             public Builder addBufferToSpill(
                     int subpartitionId, Deque<BufferIndexAndChannel> buffers) {
-                bufferToSpill.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
+                bufferToSpill
+                        .computeIfAbsent(subpartitionId, k -> new 
ArrayList<>())
+                        .addAll(buffers);
                 return this;
             }
 
             public Builder addBufferToRelease(BufferIndexAndChannel buffer) {
-                bufferToRelease.computeIfAbsent(buffer.getChannel(), 
ArrayList::new).add(buffer);
+                bufferToRelease
+                        .computeIfAbsent(buffer.getChannel(), k -> new 
ArrayList<>())
+                        .add(buffer);
                 return this;
             }
 
             public Builder addBufferToRelease(
                     int subpartitionId, List<BufferIndexAndChannel> buffers) {
-                bufferToRelease.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
+                bufferToRelease
+                        .computeIfAbsent(subpartitionId, k -> new 
ArrayList<>())
+                        .addAll(buffers);
                 return this;
             }
 
             public Builder addBufferToRelease(
                     int subpartitionId, Deque<BufferIndexAndChannel> buffers) {
-                bufferToRelease.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(buffers);
+                bufferToRelease
+                        .computeIfAbsent(subpartitionId, k -> new 
ArrayList<>())
+                        .addAll(buffers);
                 return this;
             }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
index c166f4a1c4b..5e3786daf0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
@@ -73,7 +73,7 @@ public class HsSpillingStrategyUtils {
             BufferConsumptionPriorityIterator 
bufferConsumptionPriorityIterator = heap.poll();
             BufferIndexAndChannel bufferIndexAndChannel = 
bufferConsumptionPriorityIterator.next();
             subpartitionToHighPriorityBuffers
-                    .computeIfAbsent(bufferIndexAndChannel.getChannel(), 
ArrayList::new)
+                    .computeIfAbsent(bufferIndexAndChannel.getChannel(), k -> 
new ArrayList<>())
                     .add(bufferIndexAndChannel);
             // if this iterator has next, re-added it.
             if (bufferConsumptionPriorityIterator.hasNext()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
index d6a08da5fb7..3046683f475 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
@@ -168,7 +168,7 @@ public class ProducerMergedPartitionFileIndex {
         checkArgument(firstBufferInRegion.getBufferIndex() <= 
lastBufferInRegion.getBufferIndex());
 
         subpartitionRegionMap
-                .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), 
ArrayList::new)
+                .computeIfAbsent(firstBufferInRegion.getSubpartitionId(), k -> 
new ArrayList<>())
                 .add(
                         new FixedSizeRegion(
                                 firstBufferInRegion.getBufferIndex(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
index 9980f4176fa..53b9df0f55a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
@@ -198,14 +198,16 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
 
         public Builder addSubpartitionBuffers(
                 int subpartitionId, List<BufferIndexAndChannel> 
subpartitionBuffers) {
-            allBuffers.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(subpartitionBuffers);
+            allBuffers
+                    .computeIfAbsent(subpartitionId, k -> new ArrayList<>())
+                    .addAll(subpartitionBuffers);
             return this;
         }
 
         public Builder addSpillBuffers(
                 int subpartitionId, List<Integer> 
subpartitionSpillBufferIndexes) {
             spillBufferIndexes
-                    .computeIfAbsent(subpartitionId, HashSet::new)
+                    .computeIfAbsent(subpartitionId, k -> new HashSet<>())
                     .addAll(subpartitionSpillBufferIndexes);
             return this;
         }
@@ -213,7 +215,7 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
         public Builder addConsumedBuffers(
                 int subpartitionId, List<Integer> 
subpartitionConsumedBufferIndexes) {
             consumedBufferIndexes
-                    .computeIfAbsent(subpartitionId, HashSet::new)
+                    .computeIfAbsent(subpartitionId, k -> new HashSet<>())
                     .addAll(subpartitionConsumedBufferIndexes);
             return this;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
index 65a82027c13..3f3d0a1f63e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorConcurrentAttemptsTest.java
@@ -266,7 +266,9 @@ class SourceCoordinatorConcurrentAttemptsTest extends 
SourceCoordinatorTestBase
 
         @Override
         public void handleSourceEvent(int subtaskId, int attemptNumber, 
SourceEvent sourceEvent) {
-            sourceEvents.computeIfAbsent(subtaskId, 
HashMap::new).put(attemptNumber, sourceEvent);
+            sourceEvents
+                    .computeIfAbsent(subtaskId, k -> new HashMap<>())
+                    .put(attemptNumber, sourceEvent);
             handleSourceEvent(subtaskId, sourceEvent);
         }
 

Reply via email to