[ 
https://issues.apache.org/jira/browse/FLINK-9262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541777#comment-16541777
 ] 

Chris Schneider commented on FLINK-9262:
----------------------------------------

I inadvertently left that "If I try with" text at the bottom of my previous 
comment. I'm unable to remove it now given the that the issue has been closed, 
so please ignore it.

> KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot
> -------------------------------------------------------------------
>
>                 Key: FLINK-9262
>                 URL: https://issues.apache.org/jira/browse/FLINK-9262
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming, Tests
>    Affects Versions: 1.4.0
>         Environment: macOS X High Sierra 10.13.4
> (ancient) Eclipse Luna v.4.4.1 
> JRE System Library [Java SE 8 [1.8.0_131]]
> Java 8 Update 171 build 11
>            Reporter: Chris Schneider
>            Priority: Blocker
>
> Although KeyedOneInputStreamOperatorTestHarness and other 
> AbstractStreamOperatorTestHarness subclasses are not yet part of the public 
> Flink API, we have been trying to make use of them for unit testing our map 
> functions. The following code throws NPE from the attempt to collect a 
> snapshot on Flink 1.4.0 (even after applying [the 
> fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
>  for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT:
> {code:java}
> package com.scaleunlimited.flinkcrawler.functions;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.operators.StreamFlatMap;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.util.Collector;
> import org.junit.Test;
> public class FlinkIssueTest {
>     
>     @SuppressWarnings("serial")
>     private static class MyProcessFunction extends 
> RichFlatMapFunction<String, String> {
>         @Override
>         public void flatMap(String input, Collector<String> collector) throws 
> Exception {
>             collector.collect(input);
>         }
>     }
>     
>     @SuppressWarnings({
>             "serial", "hiding"
>     })
>     private static class MyKeySelector<String> implements KeySelector<String, 
> String> {
>         @Override
>         public String getKey(String input) throws Exception {
>             return input;
>         }
>     }
>     @Test
>     public void test() throws Throwable {
>         KeyedOneInputStreamOperatorTestHarness<String, String, String> 
> testHarness =
>             new KeyedOneInputStreamOperatorTestHarness<String, String, 
> String>(
>                 new StreamFlatMap<>(new MyProcessFunction()),
>                 new MyKeySelector<String>(),
>                 BasicTypeInfo.STRING_TYPE_INFO,
>                 1,
>                 1,
>                 0);
>         testHarness.setup();
>         testHarness.open();
>         
>         for (int i = 0; i < 10; i++) {
>             String urlString = String.format("https://domain-%d.com/page1";, 
> i);
>             testHarness.processElement(new StreamRecord<>(urlString));
>         }
>         testHarness.snapshot(0L, 0L);
>     }
> }
> {code}
> Output:
> {noformat}
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>     at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>     at 
> com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>     at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>     ... 25 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to