This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8dd28a2e8bf [hotfix] peekNextBufferSubpartitionId shouldn't throw
UnsupportedDataTypeException
8dd28a2e8bf is described below
commit 8dd28a2e8bf10ec278ff90c99563468e294a135a
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Jan 19 13:32:10 2024 +0800
[hotfix] peekNextBufferSubpartitionId shouldn't throw
UnsupportedDataTypeException
---
.../partition/consumer/RecoveredInputChannel.java | 3 +--
.../tiered/netty/TestingNettyConnectionReader.java | 23 ++++++++++++++-----
.../tiered/netty/TestingTierConsumerAgent.java | 26 +++++++++++++++++-----
3 files changed, 40 insertions(+), 12 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 06ae4258da4..1f41a099931 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.activation.UnsupportedDataTypeException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -193,7 +192,7 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
@Override
protected int peekNextBufferSubpartitionIdInternal() throws IOException {
- throw new UnsupportedDataTypeException();
+ throw new UnsupportedOperationException();
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
index 4b9358eb319..5fc1855ca04 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java
@@ -20,24 +20,28 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import javax.activation.UnsupportedDataTypeException;
-
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
+import java.util.function.Supplier;
/** Test implementation for {@link NettyConnectionReader}. */
public class TestingNettyConnectionReader implements NettyConnectionReader {
private final Function<Integer, Buffer> readBufferFunction;
- private TestingNettyConnectionReader(Function<Integer, Buffer>
readBufferFunction) {
+ private final Supplier<Integer> peekNextBufferSubpartitionIdSupplier;
+
+ private TestingNettyConnectionReader(
+ Function<Integer, Buffer> readBufferFunction,
+ Supplier<Integer> peekNextBufferSubpartitionIdSupplier) {
this.readBufferFunction = readBufferFunction;
+ this.peekNextBufferSubpartitionIdSupplier =
peekNextBufferSubpartitionIdSupplier;
}
@Override
public int peekNextBufferSubpartitionId() throws IOException {
- throw new UnsupportedDataTypeException();
+ return peekNextBufferSubpartitionIdSupplier.get();
}
@Override
@@ -50,6 +54,8 @@ public class TestingNettyConnectionReader implements
NettyConnectionReader {
private Function<Integer, Buffer> readBufferFunction = segmentId ->
null;
+ private Supplier<Integer> peekNextBufferSubpartitionIdSupplier = () ->
-1;
+
public Builder() {}
public Builder setReadBufferFunction(Function<Integer, Buffer>
readBufferFunction) {
@@ -57,8 +63,15 @@ public class TestingNettyConnectionReader implements
NettyConnectionReader {
return this;
}
+ public Builder setPeekNextBufferSubpartitionIdSupplier(
+ Supplier<Integer> peekNextBufferSubpartitionIdSupplier) {
+ this.peekNextBufferSubpartitionIdSupplier =
peekNextBufferSubpartitionIdSupplier;
+ return this;
+ }
+
public TestingNettyConnectionReader build() {
- return new TestingNettyConnectionReader(readBufferFunction);
+ return new TestingNettyConnectionReader(
+ readBufferFunction, peekNextBufferSubpartitionIdSupplier);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
index cecc81f1bae..f0f2f69168c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
@@ -25,10 +25,9 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
-import javax.activation.UnsupportedDataTypeException;
-
import java.io.IOException;
import java.util.Optional;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
/** Test implementation for {@link TierConsumerAgent}. */
@@ -42,15 +41,21 @@ public class TestingTierConsumerAgent implements
TierConsumerAgent {
private final Runnable closeNotifier;
+ private final BiFunction<TieredStoragePartitionId,
ResultSubpartitionIndexSet, Integer>
+ peekNextBufferSubpartitionIdFunction;
+
private TestingTierConsumerAgent(
Runnable startNotifier,
Supplier<Buffer> bufferSupplier,
Runnable availabilityNotifierRegistrationRunnable,
- Runnable closeNotifier) {
+ Runnable closeNotifier,
+ BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet,
Integer>
+ peekNextBufferSubpartitionIdFunction) {
this.startNotifier = startNotifier;
this.bufferSupplier = bufferSupplier;
this.availabilityNotifierRegistrationRunnable =
availabilityNotifierRegistrationRunnable;
this.closeNotifier = closeNotifier;
+ this.peekNextBufferSubpartitionIdFunction =
peekNextBufferSubpartitionIdFunction;
}
@Override
@@ -62,7 +67,7 @@ public class TestingTierConsumerAgent implements
TierConsumerAgent {
public int peekNextBufferSubpartitionId(
TieredStoragePartitionId partitionId, ResultSubpartitionIndexSet
indexSet)
throws IOException {
- throw new UnsupportedDataTypeException();
+ return peekNextBufferSubpartitionIdFunction.apply(partitionId,
indexSet);
}
@Override
@@ -95,6 +100,9 @@ public class TestingTierConsumerAgent implements
TierConsumerAgent {
private Runnable closeNotifier = () -> {};
+ private BiFunction<TieredStoragePartitionId,
ResultSubpartitionIndexSet, Integer>
+ peekNextBufferSubpartitionIdFunction = (ignore1, ignore2) ->
-1;
+
public Builder() {}
public Builder setStartNotifier(Runnable startNotifier) {
@@ -119,12 +127,20 @@ public class TestingTierConsumerAgent implements
TierConsumerAgent {
return this;
}
+ public Builder setPeekNextBufferSubpartitionIdFunction(
+ BiFunction<TieredStoragePartitionId,
ResultSubpartitionIndexSet, Integer>
+ peekNextBufferSubpartitionIdFunction) {
+ this.peekNextBufferSubpartitionIdFunction =
peekNextBufferSubpartitionIdFunction;
+ return this;
+ }
+
public TestingTierConsumerAgent build() {
return new TestingTierConsumerAgent(
startNotifier,
bufferSupplier,
availabilityNotifierRegistrationRunnable,
- closeNotifier);
+ closeNotifier,
+ peekNextBufferSubpartitionIdFunction);
}
}
}