This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63f7f3b311a5fe8496014c54243c576e37559b68
Author: Yun Gao <[email protected]>
AuthorDate: Fri Nov 27 19:40:27 2020 +0800

    [FLINK-20337] Add migration test for deserializing existing 
StreamingFileSink state with FileSink serializer
    
    This re-uses the generated test data from BucketStateSerializerTest.
---
 .../flink-connector-migration-test/pom.xml         |  58 +++++
 ...leWriterBucketStateSerializerMigrationTest.java | 285 +++++++++++++++++++++
 .../bucket-state-migration-test/empty-v1/snapshot  | Bin 0 -> 128 bytes
 .../bucket-state-migration-test/empty-v2/snapshot  | Bin 0 -> 128 bytes
 ...inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 |   2 +
 ...inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 |   2 +
 ...inprogress.6729a640-0585-4785-a652-89802950c663 |   2 +
 ...inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d |   2 +
 ...inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac |   2 +
 .../full-no-in-progress-v1-template/snapshot       | Bin 0 -> 1537 bytes
 ...inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec |   2 +
 ...inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec |   2 +
 ...inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b |   2 +
 ...inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 |   2 +
 ...inprogress.666acf3e-935c-4621-8171-f7c897496524 |   2 +
 .../full-no-in-progress-v2-template/snapshot       | Bin 0 -> 1597 bytes
 ...inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 |   2 +
 ...inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c |   2 +
 ...inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 |   2 +
 ...inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 |   2 +
 ...inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 |   2 +
 ...inprogress.32f5a28f-20e1-48da-9951-10e795133d64 |   1 +
 .../full-v1-template/snapshot                      | Bin 0 -> 1613 bytes
 ...inprogress.9731063e-2b28-4701-8cc1-e706480b8022 |   2 +
 ...inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 |   2 +
 ...inprogress.6a837aa3-4736-4098-a878-fdeffe227628 |   2 +
 ...inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c |   2 +
 ...inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 |   2 +
 ...inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a |   1 +
 .../full-v2-template/snapshot                      | Bin 0 -> 1685 bytes
 ...inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 |   1 +
 .../only-in-progress-v1/snapshot                   | Bin 0 -> 404 bytes
 ...inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef |   1 +
 .../only-in-progress-v2/snapshot                   | Bin 0 -> 416 bytes
 .../src/test/resources/log4j2-test.properties      |  29 +++
 flink-connectors/pom.xml                           |   1 +
 36 files changed, 417 insertions(+)

diff --git a/flink-connectors/flink-connector-migration-test/pom.xml 
b/flink-connectors/flink-connector-migration-test/pom.xml
new file mode 100644
index 0000000..9a16ada
--- /dev/null
+++ b/flink-connectors/flink-connector-migration-test/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.12-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-file-migration-test</artifactId>
+       <name>Flink : Connectors : Migration Test</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-files</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
 
b/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
new file mode 100644
index 0000000..4cce402
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateGenerator;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketStatePathResolver;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.iterableWithSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileWriterBucketStateSerializer} that verify we can 
still read snapshots
+ * taken from old {@link StreamingFileSink}. We keep snapshots for all 
previous versions in version
+ * control (including the current version). The tests verify that the current 
version of the
+ * serializer can still read data from all previous versions.
+ *
+ * <p>This is a mirror of {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateSerializerTest}
+ * that verifies we can restore the same snapshots. We therefore have the same 
"previous versions"
+ * as that other test. The generated test data from {@code 
BucketStateSerializerTest} has been
+ * copied to be reused in this test to ensure we can restore the same bytes.
+ */
+@RunWith(Parameterized.class)
+public class FileWriterBucketStateSerializerMigrationTest {
+
+       private static final int CURRENT_VERSION = 2;
+
+       @Parameterized.Parameters(name = "Previous Version = {0}")
+       public static Collection<Integer> previousVersions() {
+               return Arrays.asList(1, 2);
+       }
+
+       @Parameterized.Parameter
+       public Integer previousVersion;
+
+       private static final String IN_PROGRESS_CONTENT = "writing";
+       private static final String PENDING_CONTENT = "wrote";
+
+       private static final String BUCKET_ID = "test-bucket";
+
+       private static final java.nio.file.Path BASE_PATH = 
Paths.get("src/test/resources/")
+                       .resolve("bucket-state-migration-test");
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private final BucketStateGenerator generator = new BucketStateGenerator(
+                       BUCKET_ID,
+                       IN_PROGRESS_CONTENT,
+                       PENDING_CONTENT,
+                       BASE_PATH,
+                       CURRENT_VERSION);
+
+       @Test
+       @Ignore
+       public void prepareDeserializationEmpty() throws IOException {
+               generator.prepareDeserializationEmpty();
+       }
+
+       @Test
+       public void testSerializationEmpty() throws IOException {
+
+               final String scenarioName = "empty";
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, previousVersion);
+
+               final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
+               final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
+               final FileWriterBucketState recoveredState = 
readBucketState(scenarioName, previousVersion);
+
+               final FileWriterBucket<String> bucket = 
restoreBucket(recoveredState);
+
+               Assert.assertEquals(testBucketPath, bucket.getBucketPath());
+               Assert.assertNull(bucket.getInProgressPart());
+               Assert.assertTrue(bucket.getPendingFiles().isEmpty());
+       }
+
+       @Test
+       @Ignore
+       public void prepareDeserializationOnlyInProgress() throws IOException {
+               generator.prepareDeserializationOnlyInProgress();
+       }
+
+       @Test
+       public void testSerializationOnlyInProgress() throws IOException {
+
+               final String scenarioName = "only-in-progress";
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, previousVersion);
+
+               final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
+
+               final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
+
+               final FileWriterBucketState recoveredState = 
readBucketState(scenarioName, previousVersion);
+
+               final FileWriterBucket<String> bucket = 
restoreBucket(recoveredState);
+
+               Assert.assertEquals(testBucketPath, bucket.getBucketPath());
+
+               //check restore the correct in progress file writer
+               Assert.assertEquals(8, bucket.getInProgressPart().getSize());
+
+               long numFiles = Files.list(Paths.get(testBucketPath.toString()))
+                               .map(file -> {
+                                       assertThat(
+                                                       
file.getFileName().toString(),
+                                                       
startsWith(".part-0-0.inprogress"));
+                                       return 1;
+                               })
+                               .count();
+
+               assertThat(numFiles, is(1L));
+       }
+
+       @Test
+       @Ignore
+       public void prepareDeserializationFull() throws IOException {
+               generator.prepareDeserializationFull();
+       }
+
+       @Test
+       public void testSerializationFull() throws IOException {
+               testDeserializationFull(true, "full");
+       }
+
+       @Test
+       @Ignore
+       public void prepareDeserializationNullInProgress() throws IOException {
+               generator.prepareDeserializationNullInProgress();
+       }
+
+       @Test
+       public void testSerializationNullInProgress() throws IOException {
+               testDeserializationFull(false, "full-no-in-progress");
+       }
+
+       private void testDeserializationFull(final boolean withInProgress, 
final String scenarioName) throws IOException {
+
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, previousVersion);
+
+               try {
+                       final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
+                       final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
+                       // restore the state
+                       final FileWriterBucketState recoveredState = 
readBucketStateFromTemplate(scenarioName, previousVersion);
+                       final int noOfPendingCheckpoints = 5;
+
+                       // there are 5 checkpoint does not complete.
+                       final Map<Long, 
List<InProgressFileWriter.PendingFileRecoverable>>
+                                       pendingFileRecoverables = 
recoveredState.getPendingFileRecoverablesPerCheckpoint();
+                       Assert.assertEquals(5L, pendingFileRecoverables.size());
+
+                       final Set<String> beforeRestorePaths = 
Files.list(outputPath.resolve(BUCKET_ID))
+                                       .map(file -> 
file.getFileName().toString())
+                                       .collect(Collectors.toSet());
+
+                       // before retsoring all file has "inprogress"
+                       for (int i = 0; i < noOfPendingCheckpoints; i++) {
+                               final String part = ".part-0-" + i + 
".inprogress";
+                               assertThat(beforeRestorePaths, 
hasItem(startsWith(part)));
+                       }
+
+                       final FileWriterBucket<String> bucket = 
restoreBucket(recoveredState);
+                       Assert.assertEquals(testBucketPath, 
bucket.getBucketPath());
+                       Assert.assertEquals(noOfPendingCheckpoints, 
bucket.getPendingFiles().size());
+
+                       // simulates we commit the recovered pending files on 
the first checkpoint
+                       bucket.snapshotState();
+                       List<FileSinkCommittable> committables = 
bucket.prepareCommit(false);
+                       FileCommitter committer = new 
FileCommitter(createBucketWriter());
+                       committer.commit(committables);
+
+                       final Set<String> afterRestorePaths = 
Files.list(outputPath.resolve(BUCKET_ID))
+                                       .map(file -> 
file.getFileName().toString())
+                                       .collect(Collectors.toSet());
+
+                       // after restoring all pending files are comitted.
+                       // there is no "inporgress" in file name for the 
committed files.
+                       for (int i = 0; i < noOfPendingCheckpoints; i++) {
+                               final String part = "part-0-" + i;
+                               assertThat(afterRestorePaths, hasItem(part));
+                               afterRestorePaths.remove(part);
+                       }
+
+                       if (withInProgress) {
+                               // only the in-progress must be left
+                               assertThat(afterRestorePaths, 
iterableWithSize(1));
+
+                               // verify that the in-progress file is still 
there
+                               assertThat(afterRestorePaths, 
hasItem(startsWith(".part-0-" + noOfPendingCheckpoints + ".inprogress")));
+                       } else {
+                               assertThat(afterRestorePaths, empty());
+                       }
+               } finally {
+                       
FileUtils.deleteDirectory(pathResolver.getResourcePath(scenarioName).toFile());
+               }
+       }
+
+       private static FileWriterBucket<String> restoreBucket(final 
FileWriterBucketState bucketState) throws IOException {
+               return FileWriterBucket.restore(
+                               createBucketWriter(),
+                               
DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+                               bucketState,
+                               OutputFileConfig.builder().build());
+       }
+
+       private static RowWiseBucketWriter<String, String> createBucketWriter() 
throws IOException {
+               return new 
RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(),
 new SimpleStringEncoder<>());
+       }
+
+       private static SimpleVersionedSerializer<FileWriterBucketState> 
bucketStateSerializer() throws IOException {
+               final RowWiseBucketWriter<String, String> bucketWriter = 
createBucketWriter();
+               return new FileWriterBucketStateSerializer(
+                               
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+                               
bucketWriter.getProperties().getPendingFileRecoverableSerializer());
+       }
+
+       private static FileWriterBucketState readBucketState(final String 
scenarioName, final int version) throws IOException {
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, version);
+               byte[] bytes = 
Files.readAllBytes(pathResolver.getSnapshotPath(scenarioName));
+               return 
SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), 
bytes);
+       }
+
+       private static FileWriterBucketState readBucketStateFromTemplate(final 
String scenarioName, final int version) throws IOException {
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, version);
+               final java.nio.file.Path scenarioPath = 
pathResolver.getResourcePath(scenarioName);
+
+               // clear the scenario files first
+               FileUtils.deleteDirectory(scenarioPath.toFile());
+
+               // prepare the scenario files
+               FileUtils.copy(new Path(scenarioPath.toString() + "-template"), 
new Path(scenarioPath.toString()), false);
+
+               return readBucketState(scenarioName, version);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
new file mode 100644
index 0000000..9700f3d
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
new file mode 100644
index 0000000..9e84e8d
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
new file mode 100644
index 0000000..5d6644c
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot
new file mode 100644
index 0000000..4e0eaa4
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64
@@ -0,0 +1 @@
+writing
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
new file mode 100644
index 0000000..dd313db
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45
new file mode 100644
index 0000000..a6c0130
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45
@@ -0,0 +1,2 @@
+wrote
+wrote
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a
@@ -0,0 +1 @@
+writing
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot
new file mode 100644
index 0000000..9d9c3fc
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0
@@ -0,0 +1 @@
+writing
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot
new file mode 100644
index 0000000..6e98be9
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef
new file mode 100644
index 0000000..631ee76
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef
@@ -0,0 +1 @@
+writing
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot
new file mode 100644
index 0000000..d21f7c4
Binary files /dev/null and 
b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot
 differ
diff --git 
a/flink-connectors/flink-connector-migration-test/src/test/resources/log4j2-test.properties
 
b/flink-connectors/flink-connector-migration-test/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..6faab06
--- /dev/null
+++ 
b/flink-connectors/flink-connector-migration-test/src/test/resources/log4j2-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 11ef77e..1f91f6c 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -57,6 +57,7 @@ under the License.
                <module>flink-connector-base</module>
                <module>flink-file-sink-common</module>
                <module>flink-connector-files</module>
+               <module>flink-connector-migration-test</module>
        </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up

Reply via email to