This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd28a2e8bf [hotfix] peekNextBufferSubpartitionId shouldn't throw 
UnsupportedDataTypeException
8dd28a2e8bf is described below

commit 8dd28a2e8bf10ec278ff90c99563468e294a135a
Author: Weijie Guo <res...@163.com>
AuthorDate: Fri Jan 19 13:32:10 2024 +0800

    [hotfix] peekNextBufferSubpartitionId shouldn't throw 
UnsupportedDataTypeException
---
 .../partition/consumer/RecoveredInputChannel.java  |  3 +--
 .../tiered/netty/TestingNettyConnectionReader.java | 23 ++++++++++++++-----
 .../tiered/netty/TestingTierConsumerAgent.java     | 26 +++++++++++++++++-----
 3 files changed, 40 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 06ae4258da4..1f41a099931 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.activation.UnsupportedDataTypeException;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -193,7 +192,7 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
 
     @Override
     protected int peekNextBufferSubpartitionIdInternal() throws IOException {
-        throw new UnsupportedDataTypeException();
+        throw new UnsupportedOperationException();
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
index 4b9358eb319..5fc1855ca04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
@@ -20,24 +20,28 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
-import javax.activation.UnsupportedDataTypeException;
-
 import java.io.IOException;
 import java.util.Optional;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 /** Test implementation for {@link NettyConnectionReader}. */
 public class TestingNettyConnectionReader implements NettyConnectionReader {
 
     private final Function<Integer, Buffer> readBufferFunction;
 
-    private TestingNettyConnectionReader(Function<Integer, Buffer> 
readBufferFunction) {
+    private final Supplier<Integer> peekNextBufferSubpartitionIdSupplier;
+
+    private TestingNettyConnectionReader(
+            Function<Integer, Buffer> readBufferFunction,
+            Supplier<Integer> peekNextBufferSubpartitionIdSupplier) {
         this.readBufferFunction = readBufferFunction;
+        this.peekNextBufferSubpartitionIdSupplier = 
peekNextBufferSubpartitionIdSupplier;
     }
 
     @Override
     public int peekNextBufferSubpartitionId() throws IOException {
-        throw new UnsupportedDataTypeException();
+        return peekNextBufferSubpartitionIdSupplier.get();
     }
 
     @Override
@@ -50,6 +54,8 @@ public class TestingNettyConnectionReader implements 
NettyConnectionReader {
 
         private Function<Integer, Buffer> readBufferFunction = segmentId -> 
null;
 
+        private Supplier<Integer> peekNextBufferSubpartitionIdSupplier = () -> 
-1;
+
         public Builder() {}
 
         public Builder setReadBufferFunction(Function<Integer, Buffer> 
readBufferFunction) {
@@ -57,8 +63,15 @@ public class TestingNettyConnectionReader implements 
NettyConnectionReader {
             return this;
         }
 
+        public Builder setPeekNextBufferSubpartitionIdSupplier(
+                Supplier<Integer> peekNextBufferSubpartitionIdSupplier) {
+            this.peekNextBufferSubpartitionIdSupplier = 
peekNextBufferSubpartitionIdSupplier;
+            return this;
+        }
+
         public TestingNettyConnectionReader build() {
-            return new TestingNettyConnectionReader(readBufferFunction);
+            return new TestingNettyConnectionReader(
+                    readBufferFunction, peekNextBufferSubpartitionIdSupplier);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
index cecc81f1bae..f0f2f69168c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
@@ -25,10 +25,9 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 
-import javax.activation.UnsupportedDataTypeException;
-
 import java.io.IOException;
 import java.util.Optional;
+import java.util.function.BiFunction;
 import java.util.function.Supplier;
 
 /** Test implementation for {@link TierConsumerAgent}. */
@@ -42,15 +41,21 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
 
     private final Runnable closeNotifier;
 
+    private final BiFunction<TieredStoragePartitionId, 
ResultSubpartitionIndexSet, Integer>
+            peekNextBufferSubpartitionIdFunction;
+
     private TestingTierConsumerAgent(
             Runnable startNotifier,
             Supplier<Buffer> bufferSupplier,
             Runnable availabilityNotifierRegistrationRunnable,
-            Runnable closeNotifier) {
+            Runnable closeNotifier,
+            BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet, 
Integer>
+                    peekNextBufferSubpartitionIdFunction) {
         this.startNotifier = startNotifier;
         this.bufferSupplier = bufferSupplier;
         this.availabilityNotifierRegistrationRunnable = 
availabilityNotifierRegistrationRunnable;
         this.closeNotifier = closeNotifier;
+        this.peekNextBufferSubpartitionIdFunction = 
peekNextBufferSubpartitionIdFunction;
     }
 
     @Override
@@ -62,7 +67,7 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
     public int peekNextBufferSubpartitionId(
             TieredStoragePartitionId partitionId, ResultSubpartitionIndexSet 
indexSet)
             throws IOException {
-        throw new UnsupportedDataTypeException();
+        return peekNextBufferSubpartitionIdFunction.apply(partitionId, 
indexSet);
     }
 
     @Override
@@ -95,6 +100,9 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
 
         private Runnable closeNotifier = () -> {};
 
+        private BiFunction<TieredStoragePartitionId, 
ResultSubpartitionIndexSet, Integer>
+                peekNextBufferSubpartitionIdFunction = (ignore1, ignore2) -> 
-1;
+
         public Builder() {}
 
         public Builder setStartNotifier(Runnable startNotifier) {
@@ -119,12 +127,20 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
             return this;
         }
 
+        public Builder setPeekNextBufferSubpartitionIdFunction(
+                BiFunction<TieredStoragePartitionId, 
ResultSubpartitionIndexSet, Integer>
+                        peekNextBufferSubpartitionIdFunction) {
+            this.peekNextBufferSubpartitionIdFunction = 
peekNextBufferSubpartitionIdFunction;
+            return this;
+        }
+
         public TestingTierConsumerAgent build() {
             return new TestingTierConsumerAgent(
                     startNotifier,
                     bufferSupplier,
                     availabilityNotifierRegistrationRunnable,
-                    closeNotifier);
+                    closeNotifier,
+                    peekNextBufferSubpartitionIdFunction);
         }
     }
 }

Reply via email to