[
https://issues.apache.org/jira/browse/FLINK-5335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748461#comment-15748461
]
ASF GitHub Bot commented on FLINK-5335:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3005#discussion_r92403016
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ListCheckpointedTest {
+
+ @Test
+ public void testUDFReturningNull() throws Exception {
+ TestUserFunction userFunction = new TestUserFunction(null);
+ AbstractStreamOperatorTestHarness<Integer> testHarness =
+ new AbstractStreamOperatorTestHarness<>(new
TestOperator(userFunction), 1, 1, 0);
+ testHarness.open();
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.initializeState(snapshot);
+ Assert.assertTrue(userFunction.isRestored());
+ }
+
+ @Test
+ public void testUDFReturningEmpty() throws Exception {
+ TestUserFunction userFunction = new
TestUserFunction(Collections.<Integer>emptyList());
+ AbstractStreamOperatorTestHarness<Integer> testHarness =
+ new AbstractStreamOperatorTestHarness<>(new
TestOperator(userFunction), 1, 1, 0);
+ testHarness.open();
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.initializeState(snapshot);
+ Assert.assertTrue(userFunction.isRestored());
+ }
+
+ @Test
+ public void testUDFReturningData() throws Exception {
+ TestUserFunction userFunction = new
TestUserFunction(Arrays.asList(1, 2, 3));
+ AbstractStreamOperatorTestHarness<Integer> testHarness =
+ new AbstractStreamOperatorTestHarness<>(new
TestOperator(userFunction), 1, 1, 0);
+ testHarness.open();
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.initializeState(snapshot);
+ Assert.assertTrue(userFunction.isRestored());
+ }
+
+
+ private static class TestUserFunction extends AbstractRichFunction
implements ListCheckpointed<Integer> {
+
+ private final List<Integer> expected;
+ private boolean restored;
+
+ public TestUserFunction(List<Integer> expected) {
+ this.expected = expected;
+ this.restored = false;
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long
timestamp) throws Exception {
+ return expected;
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ if (null != expected) {
+ Assert.assertEquals(expected, state);
+ } else {
+ Assert.assertTrue(state.isEmpty());
+ }
+ restored = true;
+ }
+
+ public boolean isRestored() {
+ return restored;
+ }
+ }
+
+ private static class TestOperator extends
AbstractUdfStreamOperator<Integer, TestUserFunction> {
--- End diff --
I think you can make the test UDF a `RichMapFunction` and then you don't
need the `TestOperator`.
> Allow ListCheckpointed user functions to return null
> ----------------------------------------------------
>
> Key: FLINK-5335
> URL: https://issues.apache.org/jira/browse/FLINK-5335
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Currently, it is not allowed to return null as result for the methods in
> {{ListCheckpointed}}. From a usability perspective, I think it is nicer to
> allow this.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)