This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63f7f3b311a5fe8496014c54243c576e37559b68 Author: Yun Gao <[email protected]> AuthorDate: Fri Nov 27 19:40:27 2020 +0800 [FLINK-20337] Add migration test for deserializing existing StreamingFileSink state with FileSink serializer This re-uses the generated test data from BucketStateSerializerTest. --- .../flink-connector-migration-test/pom.xml | 58 +++++ ...leWriterBucketStateSerializerMigrationTest.java | 285 +++++++++++++++++++++ .../bucket-state-migration-test/empty-v1/snapshot | Bin 0 -> 128 bytes .../bucket-state-migration-test/empty-v2/snapshot | Bin 0 -> 128 bytes ...inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 | 2 + ...inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 | 2 + ...inprogress.6729a640-0585-4785-a652-89802950c663 | 2 + ...inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d | 2 + ...inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac | 2 + .../full-no-in-progress-v1-template/snapshot | Bin 0 -> 1537 bytes ...inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec | 2 + ...inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec | 2 + ...inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b | 2 + ...inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 | 2 + ...inprogress.666acf3e-935c-4621-8171-f7c897496524 | 2 + .../full-no-in-progress-v2-template/snapshot | Bin 0 -> 1597 bytes ...inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 | 2 + ...inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c | 2 + ...inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 | 2 + ...inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 | 2 + ...inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 | 2 + ...inprogress.32f5a28f-20e1-48da-9951-10e795133d64 | 1 + .../full-v1-template/snapshot | Bin 0 -> 1613 bytes ...inprogress.9731063e-2b28-4701-8cc1-e706480b8022 | 2 + ...inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 | 2 + ...inprogress.6a837aa3-4736-4098-a878-fdeffe227628 | 2 + ...inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c | 2 + ...inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 | 2 + ...inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a | 1 + .../full-v2-template/snapshot | Bin 0 -> 1685 bytes ...inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 | 1 + .../only-in-progress-v1/snapshot | Bin 0 -> 404 bytes ...inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef | 1 + .../only-in-progress-v2/snapshot | Bin 0 -> 416 bytes .../src/test/resources/log4j2-test.properties | 29 +++ flink-connectors/pom.xml | 1 + 36 files changed, 417 insertions(+) diff --git a/flink-connectors/flink-connector-migration-test/pom.xml b/flink-connectors/flink-connector-migration-test/pom.xml new file mode 100644 index 0000000..9a16ada --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/pom.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.12-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-file-migration-test</artifactId> + <name>Flink : Connectors : Migration Test</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-files</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java b/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java new file mode 100644 index 0000000..4cce402 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.committer.FileCommitter; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateGenerator; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStatePathResolver; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.util.FileUtils; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.iterableWithSize; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileWriterBucketStateSerializer} that verify we can still read snapshots + * taken from old {@link StreamingFileSink}. We keep snapshots for all previous versions in version + * control (including the current version). The tests verify that the current version of the + * serializer can still read data from all previous versions. + * + * <p>This is a mirror of {@link org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateSerializerTest} + * that verifies we can restore the same snapshots. We therefore have the same "previous versions" + * as that other test. The generated test data from {@code BucketStateSerializerTest} has been + * copied to be reused in this test to ensure we can restore the same bytes. + */ +@RunWith(Parameterized.class) +public class FileWriterBucketStateSerializerMigrationTest { + + private static final int CURRENT_VERSION = 2; + + @Parameterized.Parameters(name = "Previous Version = {0}") + public static Collection<Integer> previousVersions() { + return Arrays.asList(1, 2); + } + + @Parameterized.Parameter + public Integer previousVersion; + + private static final String IN_PROGRESS_CONTENT = "writing"; + private static final String PENDING_CONTENT = "wrote"; + + private static final String BUCKET_ID = "test-bucket"; + + private static final java.nio.file.Path BASE_PATH = Paths.get("src/test/resources/") + .resolve("bucket-state-migration-test"); + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private final BucketStateGenerator generator = new BucketStateGenerator( + BUCKET_ID, + IN_PROGRESS_CONTENT, + PENDING_CONTENT, + BASE_PATH, + CURRENT_VERSION); + + @Test + @Ignore + public void prepareDeserializationEmpty() throws IOException { + generator.prepareDeserializationEmpty(); + } + + @Test + public void testSerializationEmpty() throws IOException { + + final String scenarioName = "empty"; + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion); + + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); + final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); + final FileWriterBucketState recoveredState = readBucketState(scenarioName, previousVersion); + + final FileWriterBucket<String> bucket = restoreBucket(recoveredState); + + Assert.assertEquals(testBucketPath, bucket.getBucketPath()); + Assert.assertNull(bucket.getInProgressPart()); + Assert.assertTrue(bucket.getPendingFiles().isEmpty()); + } + + @Test + @Ignore + public void prepareDeserializationOnlyInProgress() throws IOException { + generator.prepareDeserializationOnlyInProgress(); + } + + @Test + public void testSerializationOnlyInProgress() throws IOException { + + final String scenarioName = "only-in-progress"; + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion); + + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); + + final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); + + final FileWriterBucketState recoveredState = readBucketState(scenarioName, previousVersion); + + final FileWriterBucket<String> bucket = restoreBucket(recoveredState); + + Assert.assertEquals(testBucketPath, bucket.getBucketPath()); + + //check restore the correct in progress file writer + Assert.assertEquals(8, bucket.getInProgressPart().getSize()); + + long numFiles = Files.list(Paths.get(testBucketPath.toString())) + .map(file -> { + assertThat( + file.getFileName().toString(), + startsWith(".part-0-0.inprogress")); + return 1; + }) + .count(); + + assertThat(numFiles, is(1L)); + } + + @Test + @Ignore + public void prepareDeserializationFull() throws IOException { + generator.prepareDeserializationFull(); + } + + @Test + public void testSerializationFull() throws IOException { + testDeserializationFull(true, "full"); + } + + @Test + @Ignore + public void prepareDeserializationNullInProgress() throws IOException { + generator.prepareDeserializationNullInProgress(); + } + + @Test + public void testSerializationNullInProgress() throws IOException { + testDeserializationFull(false, "full-no-in-progress"); + } + + private void testDeserializationFull(final boolean withInProgress, final String scenarioName) throws IOException { + + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion); + + try { + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); + final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); + // restore the state + final FileWriterBucketState recoveredState = readBucketStateFromTemplate(scenarioName, previousVersion); + final int noOfPendingCheckpoints = 5; + + // there are 5 checkpoint does not complete. + final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> + pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint(); + Assert.assertEquals(5L, pendingFileRecoverables.size()); + + final Set<String> beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)) + .map(file -> file.getFileName().toString()) + .collect(Collectors.toSet()); + + // before retsoring all file has "inprogress" + for (int i = 0; i < noOfPendingCheckpoints; i++) { + final String part = ".part-0-" + i + ".inprogress"; + assertThat(beforeRestorePaths, hasItem(startsWith(part))); + } + + final FileWriterBucket<String> bucket = restoreBucket(recoveredState); + Assert.assertEquals(testBucketPath, bucket.getBucketPath()); + Assert.assertEquals(noOfPendingCheckpoints, bucket.getPendingFiles().size()); + + // simulates we commit the recovered pending files on the first checkpoint + bucket.snapshotState(); + List<FileSinkCommittable> committables = bucket.prepareCommit(false); + FileCommitter committer = new FileCommitter(createBucketWriter()); + committer.commit(committables); + + final Set<String> afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)) + .map(file -> file.getFileName().toString()) + .collect(Collectors.toSet()); + + // after restoring all pending files are comitted. + // there is no "inporgress" in file name for the committed files. + for (int i = 0; i < noOfPendingCheckpoints; i++) { + final String part = "part-0-" + i; + assertThat(afterRestorePaths, hasItem(part)); + afterRestorePaths.remove(part); + } + + if (withInProgress) { + // only the in-progress must be left + assertThat(afterRestorePaths, iterableWithSize(1)); + + // verify that the in-progress file is still there + assertThat(afterRestorePaths, hasItem(startsWith(".part-0-" + noOfPendingCheckpoints + ".inprogress"))); + } else { + assertThat(afterRestorePaths, empty()); + } + } finally { + FileUtils.deleteDirectory(pathResolver.getResourcePath(scenarioName).toFile()); + } + } + + private static FileWriterBucket<String> restoreBucket(final FileWriterBucketState bucketState) throws IOException { + return FileWriterBucket.restore( + createBucketWriter(), + DefaultRollingPolicy.builder().withMaxPartSize(10).build(), + bucketState, + OutputFileConfig.builder().build()); + } + + private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException { + return new RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(), new SimpleStringEncoder<>()); + } + + private static SimpleVersionedSerializer<FileWriterBucketState> bucketStateSerializer() throws IOException { + final RowWiseBucketWriter<String, String> bucketWriter = createBucketWriter(); + return new FileWriterBucketStateSerializer( + bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), + bucketWriter.getProperties().getPendingFileRecoverableSerializer()); + } + + private static FileWriterBucketState readBucketState(final String scenarioName, final int version) throws IOException { + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version); + byte[] bytes = Files.readAllBytes(pathResolver.getSnapshotPath(scenarioName)); + return SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), bytes); + } + + private static FileWriterBucketState readBucketStateFromTemplate(final String scenarioName, final int version) throws IOException { + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version); + final java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName); + + // clear the scenario files first + FileUtils.deleteDirectory(scenarioPath.toFile()); + + // prepare the scenario files + FileUtils.copy(new Path(scenarioPath.toString() + "-template"), new Path(scenarioPath.toString()), false); + + return readBucketState(scenarioName, version); + } +} diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v1/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v1/snapshot new file mode 100644 index 0000000..9700f3d Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v1/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v2/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v2/snapshot new file mode 100644 index 0000000..9e84e8d Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/empty-v2/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-0.inprogress.a88d5993-77bc-44ce-880b-9f2a43b59ab4 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-1.inprogress.7c0f2bd7-3078-48e8-9af2-d8773fb949c5 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-2.inprogress.6729a640-0585-4785-a652-89802950c663 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-3.inprogress.b4bcb0e9-5c9e-45dd-8963-1b163343544d @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/bucket/test-bucket/.part-0-4.inprogress.e1e9e48d-0db6-4dd7-8a4d-fb4ebe7ed8ac @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot new file mode 100644 index 0000000..5d6644c Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-0.inprogress.1e22e72d-0ab2-493b-8b00-9edac4252cec @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-1.inprogress.3821f491-9fa1-48b2-b66b-655352a3c8ec @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-2.inprogress.0af18f41-d8f8-4a4e-a92e-de12851be20b @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-3.inprogress.a3d0f4d2-d6ad-4f83-ba62-ed4b1fa86db2 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/bucket/test-bucket/.part-0-4.inprogress.666acf3e-935c-4621-8171-f7c897496524 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot new file mode 100644 index 0000000..4e0eaa4 Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-no-in-progress-v2-template/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-0.inprogress.8fec17e9-5d54-4fa9-aebb-70736fe03c82 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-1.inprogress.0035b171-2759-403a-8d6c-4612b28a7a6c @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-2.inprogress.49da8048-af6b-4665-b4f6-b659cb38dc97 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-3.inprogress.d13ec4e0-07b5-4f4e-9be8-9fb457cbcde9 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-4.inprogress.123ac2c7-f92a-476a-a848-1369b93d82a7 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64 new file mode 100644 index 0000000..631ee76 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/bucket/test-bucket/.part-0-5.inprogress.32f5a28f-20e1-48da-9951-10e795133d64 @@ -0,0 +1 @@ +writing diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot new file mode 100644 index 0000000..dd313db Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-0.inprogress.9731063e-2b28-4701-8cc1-e706480b8022 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-1.inprogress.1d423406-097a-4deb-bfde-d023d3477cd5 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-2.inprogress.6a837aa3-4736-4098-a878-fdeffe227628 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-3.inprogress.f121b73d-ac74-4fbd-b70d-f13e51c9132c @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 new file mode 100644 index 0000000..a6c0130 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-4.inprogress.a156884a-f090-4c3f-a271-0b63ab539c45 @@ -0,0 +1,2 @@ +wrote +wrote diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a new file mode 100644 index 0000000..631ee76 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/bucket/test-bucket/.part-0-5.inprogress.83c527c5-14dc-4d49-9f99-c915f2224f6a @@ -0,0 +1 @@ +writing diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot new file mode 100644 index 0000000..9d9c3fc Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/full-v2-template/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 new file mode 100644 index 0000000..631ee76 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/bucket/test-bucket/.part-0-0.inprogress.a70190d6-d080-43a8-b414-746b09d3a8a0 @@ -0,0 +1 @@ +writing diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot new file mode 100644 index 0000000..6e98be9 Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v1/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef new file mode 100644 index 0000000..631ee76 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/bucket/test-bucket/.part-0-0.inprogress.10833090-dd8c-4e36-884d-bb9758a3a8ef @@ -0,0 +1 @@ +writing diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot new file mode 100644 index 0000000..d21f7c4 Binary files /dev/null and b/flink-connectors/flink-connector-migration-test/src/test/resources/bucket-state-migration-test/only-in-progress-v2/snapshot differ diff --git a/flink-connectors/flink-connector-migration-test/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-migration-test/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..6faab06 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/resources/log4j2-test.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n + diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 11ef77e..1f91f6c 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -57,6 +57,7 @@ under the License. <module>flink-connector-base</module> <module>flink-file-sink-common</module> <module>flink-connector-files</module> + <module>flink-connector-migration-test</module> </modules> <!-- override these root dependencies as 'provided', so they don't end up
