[
https://issues.apache.org/jira/browse/FLINK-9262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504725#comment-16504725
]
Aljoscha Krettek commented on FLINK-9262:
-----------------------------------------
What dependencies do you have set in your pom?
I think you need
{code}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
{code}
> KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot
> -------------------------------------------------------------------
>
> Key: FLINK-9262
> URL: https://issues.apache.org/jira/browse/FLINK-9262
> Project: Flink
> Issue Type: Bug
> Components: Streaming, Tests
> Affects Versions: 1.4.0
> Environment: macOS X High Sierra 10.13.4
> (ancient) Eclipse Luna v.4.4.1
> JRE System Library [Java SE 8 [1.8.0_131]]
> Java 8 Update 171 build 11
> Reporter: Chris Schneider
> Priority: Blocker
>
> Although KeyedOneInputStreamOperatorTestHarness and other
> AbstractStreamOperatorTestHarness subclasses are not yet part of the public
> Flink API, we have been trying to make use of them for unit testing our map
> functions. The following code throws NPE from the attempt to collect a
> snapshot on Flink 1.4.0 (even after applying [the
> fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
> for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT:
> {code:java}
> package com.scaleunlimited.flinkcrawler.functions;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.operators.StreamFlatMap;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.util.Collector;
> import org.junit.Test;
> public class FlinkIssueTest {
>
> @SuppressWarnings("serial")
> private static class MyProcessFunction extends
> RichFlatMapFunction<String, String> {
> @Override
> public void flatMap(String input, Collector<String> collector) throws
> Exception {
> collector.collect(input);
> }
> }
>
> @SuppressWarnings({
> "serial", "hiding"
> })
> private static class MyKeySelector<String> implements KeySelector<String,
> String> {
> @Override
> public String getKey(String input) throws Exception {
> return input;
> }
> }
> @Test
> public void test() throws Throwable {
> KeyedOneInputStreamOperatorTestHarness<String, String, String>
> testHarness =
> new KeyedOneInputStreamOperatorTestHarness<String, String,
> String>(
> new StreamFlatMap<>(new MyProcessFunction()),
> new MyKeySelector<String>(),
> BasicTypeInfo.STRING_TYPE_INFO,
> 1,
> 1,
> 0);
> testHarness.setup();
> testHarness.open();
>
> for (int i = 0; i < 10; i++) {
> String urlString = String.format("https://domain-%d.com/page1",
> i);
> testHarness.processElement(new StreamRecord<>(urlString));
> }
> testHarness.snapshot(0L, 0L);
> }
> }
> {code}
> Output:
> {noformat}
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask
> (1/1).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
> at
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
> at
> com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> ... 25 more
> {noformat}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)