Chris Schneider created FLINK-9262:
--------------------------------------
Summary: KeyedOneInputStreamOperatorTestHarness throws NPE
creating snapshot
Key: FLINK-9262
URL: https://issues.apache.org/jira/browse/FLINK-9262
Project: Flink
Issue Type: Bug
Components: Streaming, Tests
Affects Versions: 1.4.0
Environment: macOS X High Sierra 10.13.4
(ancient) Eclipse Luna v.4.4.1
JRE System Library [Java SE 8 [1.8.0_131]]
Java 8 Update 171 build 11
Reporter: Chris Schneider
Although KeyedOneInputStreamOperatorTestHarness and other
AbstractStreamOperatorTestHarness subclasses are not yet part of the public
Flink API, we have been trying to make use of them for unit testing our map
functions. The following code throws NPE from the attempt to collect a snapshot
on Flink 1.4.0 (even after applying [the
fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
for [FLINK-8268|https://issues.apache.org/jira/browse/FLINK-8268]), but
appears to work properly on Flink 1.5-SNAPSHOT:
{code:java}
package com.scaleunlimited.flinkcrawler.functions;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class FlinkIssueTest {
@SuppressWarnings("serial")
private static class MyProcessFunction extends RichFlatMapFunction<String,
String> {
@Override
public void flatMap(String input, Collector<String> collector) throws
Exception {
collector.collect(input);
}
}
@SuppressWarnings({
"serial", "hiding"
})
private static class MyKeySelector<String> implements KeySelector<String,
String> {
@Override
public String getKey(String input) throws Exception {
return input;
}
}
@Test
public void test() throws Throwable {
KeyedOneInputStreamOperatorTestHarness<String, String, String>
testHarness =
new KeyedOneInputStreamOperatorTestHarness<String, String, String>(
new StreamFlatMap<>(new MyProcessFunction()),
new MyKeySelector<String>(),
BasicTypeInfo.STRING_TYPE_INFO,
1,
1,
0);
testHarness.setup();
testHarness.open();
for (int i = 0; i < 10; i++) {
String urlString = String.format("https://domain-%d.com/page1", i);
testHarness.processElement(new StreamRecord<>(urlString));
}
testHarness.snapshot(0L, 0L);
}
}
{code}
Output:
{noformat}
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at
com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 25 more
{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)