Chris Schneider created FLINK-9262: -------------------------------------- Summary: KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot Key: FLINK-9262 URL: https://issues.apache.org/jira/browse/FLINK-9262 Project: Flink Issue Type: Bug Components: Streaming, Tests Affects Versions: 1.4.0 Environment: macOS X High Sierra 10.13.4
(ancient) Eclipse Luna v.4.4.1 JRE System Library [Java SE 8 [1.8.0_131]] Java 8 Update 171 build 11 Reporter: Chris Schneider Although KeyedOneInputStreamOperatorTestHarness and other AbstractStreamOperatorTestHarness subclasses are not yet part of the public Flink API, we have been trying to make use of them for unit testing our map functions. The following code throws NPE from the attempt to collect a snapshot on Flink 1.4.0 (even after applying [the fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80] for [FLINK-8268|https://issues.apache.org/jira/browse/FLINK-8268]), but appears to work properly on Flink 1.5-SNAPSHOT: {code:java} package com.scaleunlimited.flinkcrawler.functions; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.operators.StreamFlatMap; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Test; public class FlinkIssueTest { @SuppressWarnings("serial") private static class MyProcessFunction extends RichFlatMapFunction<String, String> { @Override public void flatMap(String input, Collector<String> collector) throws Exception { collector.collect(input); } } @SuppressWarnings({ "serial", "hiding" }) private static class MyKeySelector<String> implements KeySelector<String, String> { @Override public String getKey(String input) throws Exception { return input; } } @Test public void test() throws Throwable { KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<String, String, String>( new StreamFlatMap<>(new MyProcessFunction()), new MyKeySelector<String>(), BasicTypeInfo.STRING_TYPE_INFO, 1, 1, 0); testHarness.setup(); testHarness.open(); for (int i = 0; i < 10; i++) { String urlString = String.format("https://domain-%d.com/page1", i); testHarness.processElement(new StreamRecord<>(urlString)); } testHarness.snapshot(0L, 0L); } } {code} Output: {noformat} java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) at com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) ... 25 more {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)