This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 684bdce974853175d329405c91f31bcaa8d7b733 Author: Yun Gao <[email protected]> AuthorDate: Thu Nov 26 12:21:09 2020 +0800 [refactor] Factor test data generator/resolver out of BucketStateSerializerTest We want to reuse this code for the new FileSink migration test as well. --- .../sink/filesystem/BucketStateGenerator.java | 173 +++++++++++++++++++++ .../sink/filesystem/BucketStatePathResolver.java | 49 ++++++ .../sink/filesystem/BucketStateSerializerTest.java | 153 ++++-------------- 3 files changed, 250 insertions(+), 125 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java new file mode 100644 index 0000000..a35bf55 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.util.FileUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * Utilities to generate sample bucket states so that we could test migration of bucket states. + */ +public class BucketStateGenerator { + private final String bucketId; + private final String inProgressContent; + private final String pendingContent; + private final BucketStatePathResolver pathResolver; + + public BucketStateGenerator( + String bucketId, + String inProgressContent, + String pendingContent, + java.nio.file.Path basePath, + int currentVersion) { + this.bucketId = bucketId; + this.inProgressContent = inProgressContent; + this.pendingContent = pendingContent; + this.pathResolver = new BucketStatePathResolver(basePath, currentVersion); + } + + public void prepareDeserializationEmpty() throws IOException { + final String scenarioName = "empty"; + final java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName); + + FileUtils.deleteDirectory(scenarioPath.toFile()); + Files.createDirectories(scenarioPath); + + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); + final Path testBucketPath = new Path(outputPath.resolve(bucketId).toString()); + + final Bucket<String, String> bucket = createNewBucket(testBucketPath); + + final BucketState<String> bucketState = bucket.onReceptionOfCheckpoint(0); + + byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize( + bucketStateSerializer(), + bucketState); + Files.write(pathResolver.getSnapshotPath(scenarioName), bytes); + } + + public void prepareDeserializationOnlyInProgress() throws IOException { + final String scenarioName = "only-in-progress"; + final java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName); + FileUtils.deleteDirectory(scenarioPath.toFile()); + Files.createDirectories(scenarioPath); + + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); + final Path testBucketPath = new Path(outputPath.resolve(bucketId).toString()); + + final Bucket<String, String> bucket = + createNewBucket(testBucketPath); + + bucket.write(inProgressContent, System.currentTimeMillis()); + + final BucketState<String> bucketState = bucket.onReceptionOfCheckpoint(0); + + final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize( + bucketStateSerializer(), bucketState); + + Files.write(pathResolver.getSnapshotPath(scenarioName), bytes); + } + + public void prepareDeserializationFull() throws IOException { + prepareDeserializationFull(true, "full"); + } + + public void prepareDeserializationNullInProgress() throws IOException { + prepareDeserializationFull(false, "full-no-in-progress"); + } + + private void prepareDeserializationFull(final boolean withInProgress, final String scenarioName) throws IOException { + final java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName); + FileUtils.deleteDirectory(Paths.get(scenarioPath.toString() + "-template").toFile()); + Files.createDirectories(scenarioPath); + + final int noOfPendingCheckpoints = 5; + + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); + + final Path testBucketPath = new Path(outputPath.resolve(bucketId).toString()); + + final Bucket<String, String> bucket = createNewBucket(testBucketPath); + + BucketState<String> bucketState = null; + // pending for checkpoints + for (int i = 0; i < noOfPendingCheckpoints; i++) { + // write 10 bytes to the in progress file + bucket.write(pendingContent, System.currentTimeMillis()); + bucket.write(pendingContent, System.currentTimeMillis()); + // every checkpoint would produce a pending file + bucketState = bucket.onReceptionOfCheckpoint(i); + } + + if (withInProgress) { + // create a in progress file + bucket.write(inProgressContent, System.currentTimeMillis()); + + // 5 pending files and 1 in progress file + bucketState = bucket.onReceptionOfCheckpoint(noOfPendingCheckpoints); + } + + final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer(), bucketState); + + Files.write(pathResolver.getSnapshotPath(scenarioName), bytes); + + // copy the scenario file to a template directory. + // it is because that the test `testSerializationFull` would change the in progress file to pending files. + moveToTemplateDirectory(scenarioPath); + } + + private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException { + return new RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(), new SimpleStringEncoder<>()); + } + + private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException { + final RowWiseBucketWriter bucketWriter = createBucketWriter(); + return new BucketStateSerializer<>( + bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), + bucketWriter.getProperties().getPendingFileRecoverableSerializer(), + SimpleVersionedStringSerializer.INSTANCE); + } + + private Bucket<String, String> createNewBucket(Path bucketPath) throws IOException { + return Bucket.getNew( + 0, + bucketId, + bucketPath, + 0, + createBucketWriter(), + DefaultRollingPolicy.builder().withMaxPartSize(10).build(), + null, + OutputFileConfig.builder().build()); + } + + private void moveToTemplateDirectory(java.nio.file.Path scenarioPath) throws IOException { + FileUtils.copy(new Path(scenarioPath.toString()), new Path(scenarioPath.toString() + "-template"), false); + FileUtils.deleteDirectory(scenarioPath.toFile()); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStatePathResolver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStatePathResolver.java new file mode 100644 index 0000000..1d0f335 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStatePathResolver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import java.nio.file.Path; + +/** + * Utilities to resolve the directory structure for the bucket state + * generated for migration test. + */ +public class BucketStatePathResolver { + private final java.nio.file.Path basePath; + private final int version; + + public BucketStatePathResolver(Path basePath, int version) { + this.basePath = basePath; + this.version = version; + } + + public java.nio.file.Path getSnapshotPath(String scenarioName) { + java.nio.file.Path basePath = getResourcePath(scenarioName); + return basePath.resolve("snapshot"); + } + + public java.nio.file.Path getOutputPath(String scenarioName) { + java.nio.file.Path basePath = getResourcePath(scenarioName); + return basePath.resolve("bucket"); + } + + public java.nio.file.Path getResourcePath(String scenarioName) { + return basePath.resolve(scenarioName + "-v" + version); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java index aac6612..3a80257 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java @@ -76,58 +76,32 @@ public class BucketStateSerializerTest { private static final String BUCKET_ID = "test-bucket"; + private static final java.nio.file.Path BASE_PATH = Paths.get("src/test/resources/") + .resolve("bucket-state-migration-test"); + @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); - private static java.nio.file.Path getResourcePath( - String scenarioName, - int version) { - return Paths.get("src/test/resources/") - .resolve("bucket-state-migration-test") - .resolve(scenarioName + "-v" + version); - } - - private static java.nio.file.Path getSnapshotPath( - String scenarioName, - int version) { - java.nio.file.Path basePath = getResourcePath(scenarioName, version); - return basePath.resolve("snapshot"); - } - - private static java.nio.file.Path getOutputPath(String scenarioName, int version) { - java.nio.file.Path basePath = getResourcePath(scenarioName, version); - return basePath.resolve("bucket"); - } + private final BucketStateGenerator generator = new BucketStateGenerator( + BUCKET_ID, + IN_PROGRESS_CONTENT, + PENDING_CONTENT, + BASE_PATH, + CURRENT_VERSION); @Test @Ignore public void prepareDeserializationEmpty() throws IOException { - - final String scenarioName = "empty"; - final java.nio.file.Path scenarioPath = getResourcePath(scenarioName, CURRENT_VERSION); - - FileUtils.deleteDirectory(scenarioPath.toFile()); - Files.createDirectories(scenarioPath); - - final java.nio.file.Path outputPath = getOutputPath(scenarioName, CURRENT_VERSION); - final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); - - final Bucket<String, String> bucket = - createNewBucket(testBucketPath); - - final BucketState<String> bucketState = bucket.onReceptionOfCheckpoint(0); - - byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize( - bucketStateSerializer(), - bucketState); - Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), bytes); + generator.prepareDeserializationEmpty(); } @Test public void testSerializationEmpty() throws IOException { final String scenarioName = "empty"; - final java.nio.file.Path outputPath = getOutputPath(scenarioName, previousVersion); + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion); + + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); final BucketState<String> recoveredState = readBucketState(scenarioName, previousVersion); @@ -141,33 +115,16 @@ public class BucketStateSerializerTest { @Test @Ignore public void prepareDeserializationOnlyInProgress() throws IOException { - - final String scenarioName = "only-in-progress"; - final java.nio.file.Path scenarioPath = getResourcePath(scenarioName, CURRENT_VERSION); - FileUtils.deleteDirectory(scenarioPath.toFile()); - Files.createDirectories(scenarioPath); - - final java.nio.file.Path outputPath = getOutputPath(scenarioName, CURRENT_VERSION); - final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); - - final Bucket<String, String> bucket = - createNewBucket(testBucketPath); - - bucket.write(IN_PROGRESS_CONTENT, System.currentTimeMillis()); - - final BucketState<String> bucketState = bucket.onReceptionOfCheckpoint(0); - - final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize( - bucketStateSerializer(), bucketState); - - Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), bytes); + generator.prepareDeserializationOnlyInProgress(); } @Test public void testSerializationOnlyInProgress() throws IOException { final String scenarioName = "only-in-progress"; - final java.nio.file.Path outputPath = getOutputPath(scenarioName, previousVersion); + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion); + + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); @@ -195,7 +152,7 @@ public class BucketStateSerializerTest { @Test @Ignore public void prepareDeserializationFull() throws IOException { - prepareDeserializationFull(true, "full"); + generator.prepareDeserializationFull(); } @Test @@ -206,7 +163,7 @@ public class BucketStateSerializerTest { @Test @Ignore public void prepareDeserializationNullInProgress() throws IOException { - prepareDeserializationFull(false, "full-no-in-progress"); + generator.prepareDeserializationNullInProgress(); } @Test @@ -214,51 +171,12 @@ public class BucketStateSerializerTest { testDeserializationFull(false, "full-no-in-progress"); } - private static void prepareDeserializationFull(final boolean withInProgress, final String scenarioName) throws IOException { - - final java.nio.file.Path scenarioPath = getResourcePath(scenarioName, CURRENT_VERSION); - FileUtils.deleteDirectory(Paths.get(scenarioPath.toString() + "-template").toFile()); - Files.createDirectories(scenarioPath); - - final int noOfPendingCheckpoints = 5; - - final java.nio.file.Path outputPath = getOutputPath(scenarioName, CURRENT_VERSION); - - final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); - - final Bucket<String, String> bucket = createNewBucket(testBucketPath); - - BucketState<String> bucketState = null; - // pending for checkpoints - for (int i = 0; i < noOfPendingCheckpoints; i++) { - // write 10 bytes to the in progress file - bucket.write(PENDING_CONTENT, System.currentTimeMillis()); - bucket.write(PENDING_CONTENT, System.currentTimeMillis()); - // every checkpoint would produce a pending file - bucketState = bucket.onReceptionOfCheckpoint(i); - } - - if (withInProgress) { - // create a in progress file - bucket.write(IN_PROGRESS_CONTENT, System.currentTimeMillis()); - - // 5 pending files and 1 in progress file - bucketState = bucket.onReceptionOfCheckpoint(noOfPendingCheckpoints); - } - - final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer(), bucketState); - - Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), bytes); - - // copy the scenario file to a template directory. - // it is because that the test `testSerializationFull` would change the in progress file to pending files. - moveToTemplateDirectory(scenarioPath); - } - private void testDeserializationFull(final boolean withInProgress, final String scenarioName) throws IOException { + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion); + try { - final java.nio.file.Path outputPath = getOutputPath(scenarioName, previousVersion); + final java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName); final Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString()); // restore the state final BucketState<String> recoveredState = readBucketStateFromTemplate(scenarioName, previousVersion); @@ -306,22 +224,10 @@ public class BucketStateSerializerTest { assertThat(afterRestorePaths, empty()); } } finally { - FileUtils.deleteDirectory(getResourcePath(scenarioName, previousVersion).toFile()); + FileUtils.deleteDirectory(pathResolver.getResourcePath(scenarioName).toFile()); } } - private static Bucket<String, String> createNewBucket(final Path bucketPath) throws IOException { - return Bucket.getNew( - 0, - BUCKET_ID, - bucketPath, - 0, - createBucketWriter(), - DefaultRollingPolicy.builder().withMaxPartSize(10).build(), - null, - OutputFileConfig.builder().build()); - } - private static Bucket<String, String> restoreBucket(final int initialPartCounter, final BucketState<String> bucketState) throws IOException { return Bucket.restore( 0, @@ -338,7 +244,7 @@ public class BucketStateSerializerTest { } private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException { - final RowWiseBucketWriter bucketWriter = createBucketWriter(); + final RowWiseBucketWriter<String, String> bucketWriter = createBucketWriter(); return new BucketStateSerializer<>( bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), bucketWriter.getProperties().getPendingFileRecoverableSerializer(), @@ -346,12 +252,14 @@ public class BucketStateSerializerTest { } private static BucketState<String> readBucketState(final String scenarioName, final int version) throws IOException { - byte[] bytes = Files.readAllBytes(getSnapshotPath(scenarioName, version)); + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version); + byte[] bytes = Files.readAllBytes(pathResolver.getSnapshotPath(scenarioName)); return SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), bytes); } private static BucketState<String> readBucketStateFromTemplate(final String scenarioName, final int version) throws IOException { - final java.nio.file.Path scenarioPath = getResourcePath(scenarioName, version); + final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version); + final java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName); // clear the scenario files first FileUtils.deleteDirectory(scenarioPath.toFile()); @@ -361,9 +269,4 @@ public class BucketStateSerializerTest { return readBucketState(scenarioName, version); } - - private static void moveToTemplateDirectory(java.nio.file.Path scenarioPath) throws IOException { - FileUtils.copy(new Path(scenarioPath.toString()), new Path(scenarioPath.toString() + "-template"), false); - FileUtils.deleteDirectory(scenarioPath.toFile()); - } }
