[
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993064#comment-15993064
]
ASF GitHub Bot commented on FLINK-5969:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3778#discussion_r114343238
--- Diff:
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
---
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import java.io.FileOutputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class ContinuousFileProcessingFrom12MigrationTest {
+
+ private static final int LINES_PER_FILE = 10;
+
+ private static final long INTERVAL = 100;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ /**
+ * Manually run this to write binary snapshot data. Remove @Ignore to
run.
+ */
+ @Ignore
+ @Test
+ public void writeReaderSnapshot() throws Exception {
+
+ File testFolder = tempFolder.newFolder();
+
+ TimestampedFileInputSplit split1 =
+ new TimestampedFileInputSplit(0, 3, new
Path("test/test1"), 0, 100, null);
+
+ TimestampedFileInputSplit split2 =
+ new TimestampedFileInputSplit(10, 2, new
Path("test/test2"), 101, 200, null);
+
+ TimestampedFileInputSplit split3 =
+ new TimestampedFileInputSplit(10, 1, new
Path("test/test2"), 0, 100, null);
+
+ TimestampedFileInputSplit split4 =
+ new TimestampedFileInputSplit(11, 0, new
Path("test/test3"), 0, 100, null);
+
+ final OneShotLatch latch = new OneShotLatch();
--- End diff --
Yes, it's intended, to ensure that the Reader never actually does any
reading. I copied this straight from the old tests but I agree that it's not
very apparent. I'll add a comment
> Add savepoint backwards compatibility tests from 1.2 to 1.3
> -----------------------------------------------------------
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Affects Versions: 1.3.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
> - {{StatefulUDFSavepointMigrationITCase}}
> - {{*MigrationTest}}
> - {{AbstractKeyedCEPPatternOperator}}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)