This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7b6243bb0ba [FLINK-5279] Print state name and type in error message
when trying to access keyed state in non-keyed operator
7b6243bb0ba is described below
commit 7b6243bb0ba55aafad1ca5a17bc457d229763433
Author: Zakelly <[email protected]>
AuthorDate: Sat Sep 2 23:09:36 2023 +0800
[FLINK-5279] Print state name and type in error message when trying to
access keyed state in non-keyed operator
---
.../api/operators/StreamingRuntimeContext.java | 4 +++-
.../co/CoBroadcastWithNonKeyedOperatorTest.java | 22 ++++++++++++----------
2 files changed, 15 insertions(+), 11 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 97dd107f352..b9fb0c003a8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -237,7 +237,9 @@ public class StreamingRuntimeContext extends
AbstractRuntimeUDFContext {
checkNotNull(stateDescriptor, "The state properties must not be null");
checkNotNull(
keyedStateStore,
- "Keyed state can only be used on a 'keyed stream', i.e., after
a 'keyBy()' operation.");
+ String.format(
+ "Keyed state '%s' with type %s can only be used on a
'keyed stream', i.e., after a 'keyBy()' operation.",
+ stateDescriptor.getName(), stateDescriptor.getType()));
return keyedStateStore;
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
index dae80d11991..e33a4d25021 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
@@ -458,15 +458,14 @@ public class CoBroadcastWithNonKeyedOperatorTest {
boolean exceptionThrown = false;
+ final ValueStateDescriptor<String> valueState =
+ new ValueStateDescriptor<>("any",
BasicTypeInfo.STRING_TYPE_INFO);
+
try (TwoInputStreamOperatorTestHarness<String, Integer, String>
testHarness =
getInitializedTestHarness(
new BroadcastProcessFunction<String, Integer,
String>() {
private static final long serialVersionUID =
-1725365436500098384L;
- private final ValueStateDescriptor<String>
valueState =
- new ValueStateDescriptor<>(
- "any",
BasicTypeInfo.STRING_TYPE_INFO);
-
@Override
public void processBroadcastElement(
Integer value, Context ctx,
Collector<String> out)
@@ -488,7 +487,9 @@ public class CoBroadcastWithNonKeyedOperatorTest {
testHarness.processElement2(new StreamRecord<>(5, 12L));
} catch (NullPointerException e) {
Assert.assertEquals(
- "Keyed state can only be used on a 'keyed stream', i.e.,
after a 'keyBy()' operation.",
+ String.format(
+ "Keyed state '%s' with type %s can only be used on
a 'keyed stream', i.e., after a 'keyBy()' operation.",
+ valueState.getName(), valueState.getType()),
e.getMessage());
exceptionThrown = true;
}
@@ -503,15 +504,14 @@ public class CoBroadcastWithNonKeyedOperatorTest {
boolean exceptionThrown = false;
+ final ValueStateDescriptor<String> valueState =
+ new ValueStateDescriptor<>("any",
BasicTypeInfo.STRING_TYPE_INFO);
+
try (TwoInputStreamOperatorTestHarness<String, Integer, String>
testHarness =
getInitializedTestHarness(
new BroadcastProcessFunction<String, Integer,
String>() {
private static final long serialVersionUID =
-1725365436500098384L;
- private final ValueStateDescriptor<String>
valueState =
- new ValueStateDescriptor<>(
- "any",
BasicTypeInfo.STRING_TYPE_INFO);
-
@Override
public void processBroadcastElement(
Integer value, Context ctx,
Collector<String> out)
@@ -533,7 +533,9 @@ public class CoBroadcastWithNonKeyedOperatorTest {
testHarness.processElement1(new StreamRecord<>("5", 12L));
} catch (NullPointerException e) {
Assert.assertEquals(
- "Keyed state can only be used on a 'keyed stream', i.e.,
after a 'keyBy()' operation.",
+ String.format(
+ "Keyed state '%s' with type %s can only be used on
a 'keyed stream', i.e., after a 'keyBy()' operation.",
+ valueState.getName(), valueState.getType()),
e.getMessage());
exceptionThrown = true;
}