This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 684bdce974853175d329405c91f31bcaa8d7b733
Author: Yun Gao <[email protected]>
AuthorDate: Thu Nov 26 12:21:09 2020 +0800

    [refactor] Factor test data generator/resolver out of 
BucketStateSerializerTest
    
    We want to reuse this code for the new FileSink migration test as well.
---
 .../sink/filesystem/BucketStateGenerator.java      | 173 +++++++++++++++++++++
 .../sink/filesystem/BucketStatePathResolver.java   |  49 ++++++
 .../sink/filesystem/BucketStateSerializerTest.java | 153 ++++--------------
 3 files changed, 250 insertions(+), 125 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
new file mode 100644
index 0000000..a35bf55
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.util.FileUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * Utilities to generate sample bucket states so that we could test migration 
of bucket states.
+ */
+public class BucketStateGenerator {
+       private final String bucketId;
+       private final String inProgressContent;
+       private final String pendingContent;
+       private final BucketStatePathResolver pathResolver;
+
+       public BucketStateGenerator(
+                       String bucketId,
+                       String inProgressContent,
+                       String pendingContent,
+                       java.nio.file.Path basePath,
+                       int currentVersion) {
+               this.bucketId = bucketId;
+               this.inProgressContent = inProgressContent;
+               this.pendingContent = pendingContent;
+               this.pathResolver = new BucketStatePathResolver(basePath, 
currentVersion);
+       }
+
+       public void prepareDeserializationEmpty() throws IOException {
+               final String scenarioName = "empty";
+               final java.nio.file.Path scenarioPath = 
pathResolver.getResourcePath(scenarioName);
+
+               FileUtils.deleteDirectory(scenarioPath.toFile());
+               Files.createDirectories(scenarioPath);
+
+               final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
+               final Path testBucketPath = new 
Path(outputPath.resolve(bucketId).toString());
+
+               final Bucket<String, String> bucket = 
createNewBucket(testBucketPath);
+
+               final BucketState<String> bucketState = 
bucket.onReceptionOfCheckpoint(0);
+
+               byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(
+                               bucketStateSerializer(),
+                               bucketState);
+               Files.write(pathResolver.getSnapshotPath(scenarioName), bytes);
+       }
+
+       public void prepareDeserializationOnlyInProgress() throws IOException {
+               final String scenarioName = "only-in-progress";
+               final java.nio.file.Path scenarioPath = 
pathResolver.getResourcePath(scenarioName);
+               FileUtils.deleteDirectory(scenarioPath.toFile());
+               Files.createDirectories(scenarioPath);
+
+               final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
+               final Path testBucketPath = new 
Path(outputPath.resolve(bucketId).toString());
+
+               final Bucket<String, String> bucket =
+                               createNewBucket(testBucketPath);
+
+               bucket.write(inProgressContent, System.currentTimeMillis());
+
+               final BucketState<String> bucketState = 
bucket.onReceptionOfCheckpoint(0);
+
+               final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(
+                               bucketStateSerializer(), bucketState);
+
+               Files.write(pathResolver.getSnapshotPath(scenarioName), bytes);
+       }
+
+       public void prepareDeserializationFull() throws IOException {
+               prepareDeserializationFull(true, "full");
+       }
+
+       public void prepareDeserializationNullInProgress() throws IOException {
+               prepareDeserializationFull(false, "full-no-in-progress");
+       }
+
+       private void prepareDeserializationFull(final boolean withInProgress, 
final String scenarioName) throws IOException {
+               final java.nio.file.Path scenarioPath = 
pathResolver.getResourcePath(scenarioName);
+               FileUtils.deleteDirectory(Paths.get(scenarioPath.toString() + 
"-template").toFile());
+               Files.createDirectories(scenarioPath);
+
+               final int noOfPendingCheckpoints = 5;
+
+               final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
+
+               final Path testBucketPath = new 
Path(outputPath.resolve(bucketId).toString());
+
+               final Bucket<String, String> bucket = 
createNewBucket(testBucketPath);
+
+               BucketState<String> bucketState = null;
+               // pending for checkpoints
+               for (int i = 0; i < noOfPendingCheckpoints; i++) {
+                       // write 10 bytes to the in progress file
+                       bucket.write(pendingContent, 
System.currentTimeMillis());
+                       bucket.write(pendingContent, 
System.currentTimeMillis());
+                       // every checkpoint would produce a pending file
+                       bucketState = bucket.onReceptionOfCheckpoint(i);
+               }
+
+               if (withInProgress) {
+                       // create a in progress file
+                       bucket.write(inProgressContent, 
System.currentTimeMillis());
+
+                       // 5 pending files and 1 in progress file
+                       bucketState = 
bucket.onReceptionOfCheckpoint(noOfPendingCheckpoints);
+               }
+
+               final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer(), 
bucketState);
+
+               Files.write(pathResolver.getSnapshotPath(scenarioName), bytes);
+
+               // copy the scenario file to a template directory.
+               // it is because that the test `testSerializationFull` would 
change the in progress file to pending files.
+               moveToTemplateDirectory(scenarioPath);
+       }
+
+       private static RowWiseBucketWriter<String, String> createBucketWriter() 
throws IOException {
+               return new 
RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(),
 new SimpleStringEncoder<>());
+       }
+
+       private static SimpleVersionedSerializer<BucketState<String>> 
bucketStateSerializer() throws IOException {
+               final RowWiseBucketWriter bucketWriter = createBucketWriter();
+               return new BucketStateSerializer<>(
+                               
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+                               
bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+                               SimpleVersionedStringSerializer.INSTANCE);
+       }
+
+       private Bucket<String, String> createNewBucket(Path bucketPath) throws 
IOException {
+               return Bucket.getNew(
+                               0,
+                               bucketId,
+                               bucketPath,
+                               0,
+                               createBucketWriter(),
+                               
DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+                               null,
+                               OutputFileConfig.builder().build());
+       }
+
+       private void moveToTemplateDirectory(java.nio.file.Path scenarioPath) 
throws IOException {
+               FileUtils.copy(new Path(scenarioPath.toString()), new 
Path(scenarioPath.toString() + "-template"), false);
+               FileUtils.deleteDirectory(scenarioPath.toFile());
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStatePathResolver.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStatePathResolver.java
new file mode 100644
index 0000000..1d0f335
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStatePathResolver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import java.nio.file.Path;
+
+/**
+ * Utilities to resolve the directory structure for the bucket state
+ * generated for migration test.
+ */
+public class BucketStatePathResolver {
+       private final java.nio.file.Path basePath;
+       private final int version;
+
+       public BucketStatePathResolver(Path basePath, int version) {
+               this.basePath = basePath;
+               this.version = version;
+       }
+
+       public java.nio.file.Path getSnapshotPath(String scenarioName) {
+               java.nio.file.Path basePath = getResourcePath(scenarioName);
+               return basePath.resolve("snapshot");
+       }
+
+       public java.nio.file.Path getOutputPath(String scenarioName) {
+               java.nio.file.Path basePath = getResourcePath(scenarioName);
+               return basePath.resolve("bucket");
+       }
+
+       public java.nio.file.Path getResourcePath(String scenarioName) {
+               return basePath.resolve(scenarioName + "-v" + version);
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index aac6612..3a80257 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -76,58 +76,32 @@ public class BucketStateSerializerTest {
 
        private static final String BUCKET_ID = "test-bucket";
 
+       private static final java.nio.file.Path BASE_PATH = 
Paths.get("src/test/resources/")
+                       .resolve("bucket-state-migration-test");
+
        @ClassRule
        public static TemporaryFolder tempFolder = new TemporaryFolder();
 
-       private static java.nio.file.Path getResourcePath(
-               String scenarioName,
-               int version) {
-               return Paths.get("src/test/resources/")
-                       .resolve("bucket-state-migration-test")
-                       .resolve(scenarioName + "-v" + version);
-       }
-
-       private static java.nio.file.Path getSnapshotPath(
-               String scenarioName,
-               int version) {
-               java.nio.file.Path basePath = getResourcePath(scenarioName, 
version);
-               return basePath.resolve("snapshot");
-       }
-
-       private static java.nio.file.Path getOutputPath(String scenarioName, 
int version) {
-               java.nio.file.Path basePath = getResourcePath(scenarioName, 
version);
-               return basePath.resolve("bucket");
-       }
+       private final BucketStateGenerator generator = new BucketStateGenerator(
+                       BUCKET_ID,
+                       IN_PROGRESS_CONTENT,
+                       PENDING_CONTENT,
+                       BASE_PATH,
+                       CURRENT_VERSION);
 
        @Test
        @Ignore
        public void prepareDeserializationEmpty() throws IOException {
-
-               final String scenarioName = "empty";
-               final java.nio.file.Path scenarioPath = 
getResourcePath(scenarioName, CURRENT_VERSION);
-
-               FileUtils.deleteDirectory(scenarioPath.toFile());
-               Files.createDirectories(scenarioPath);
-
-               final java.nio.file.Path outputPath = 
getOutputPath(scenarioName, CURRENT_VERSION);
-               final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
-
-               final Bucket<String, String> bucket =
-                       createNewBucket(testBucketPath);
-
-               final BucketState<String> bucketState = 
bucket.onReceptionOfCheckpoint(0);
-
-               byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(
-                       bucketStateSerializer(),
-                       bucketState);
-               Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), 
bytes);
+               generator.prepareDeserializationEmpty();
        }
 
        @Test
        public void testSerializationEmpty() throws IOException {
 
                final String scenarioName = "empty";
-               final java.nio.file.Path outputPath = 
getOutputPath(scenarioName, previousVersion);
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, previousVersion);
+
+               final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
                final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
                final BucketState<String> recoveredState = 
readBucketState(scenarioName, previousVersion);
 
@@ -141,33 +115,16 @@ public class BucketStateSerializerTest {
        @Test
        @Ignore
        public void prepareDeserializationOnlyInProgress() throws IOException {
-
-               final String scenarioName = "only-in-progress";
-               final java.nio.file.Path scenarioPath = 
getResourcePath(scenarioName, CURRENT_VERSION);
-               FileUtils.deleteDirectory(scenarioPath.toFile());
-               Files.createDirectories(scenarioPath);
-
-               final java.nio.file.Path outputPath = 
getOutputPath(scenarioName, CURRENT_VERSION);
-               final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
-
-               final Bucket<String, String> bucket =
-                       createNewBucket(testBucketPath);
-
-               bucket.write(IN_PROGRESS_CONTENT, System.currentTimeMillis());
-
-               final BucketState<String> bucketState = 
bucket.onReceptionOfCheckpoint(0);
-
-               final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(
-                       bucketStateSerializer(), bucketState);
-
-               Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), 
bytes);
+               generator.prepareDeserializationOnlyInProgress();
        }
 
        @Test
        public void testSerializationOnlyInProgress() throws IOException {
 
                final String scenarioName = "only-in-progress";
-               final java.nio.file.Path outputPath = 
getOutputPath(scenarioName, previousVersion);
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, previousVersion);
+
+               final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
 
                final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
 
@@ -195,7 +152,7 @@ public class BucketStateSerializerTest {
        @Test
        @Ignore
        public void prepareDeserializationFull() throws IOException {
-               prepareDeserializationFull(true, "full");
+               generator.prepareDeserializationFull();
        }
 
        @Test
@@ -206,7 +163,7 @@ public class BucketStateSerializerTest {
        @Test
        @Ignore
        public void prepareDeserializationNullInProgress() throws IOException {
-               prepareDeserializationFull(false, "full-no-in-progress");
+               generator.prepareDeserializationNullInProgress();
        }
 
        @Test
@@ -214,51 +171,12 @@ public class BucketStateSerializerTest {
                testDeserializationFull(false, "full-no-in-progress");
        }
 
-       private static void prepareDeserializationFull(final boolean 
withInProgress, final String scenarioName) throws IOException {
-
-               final java.nio.file.Path scenarioPath = 
getResourcePath(scenarioName, CURRENT_VERSION);
-               FileUtils.deleteDirectory(Paths.get(scenarioPath.toString() + 
"-template").toFile());
-               Files.createDirectories(scenarioPath);
-
-               final int noOfPendingCheckpoints = 5;
-
-               final java.nio.file.Path outputPath = 
getOutputPath(scenarioName, CURRENT_VERSION);
-
-               final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
-
-               final Bucket<String, String> bucket = 
createNewBucket(testBucketPath);
-
-               BucketState<String> bucketState = null;
-               // pending for checkpoints
-               for (int i = 0; i < noOfPendingCheckpoints; i++) {
-                       // write 10 bytes to the in progress file
-                       bucket.write(PENDING_CONTENT, 
System.currentTimeMillis());
-                       bucket.write(PENDING_CONTENT, 
System.currentTimeMillis());
-                       // every checkpoint would produce a pending file
-                       bucketState = bucket.onReceptionOfCheckpoint(i);
-               }
-
-               if (withInProgress) {
-                       // create a in progress file
-                       bucket.write(IN_PROGRESS_CONTENT, 
System.currentTimeMillis());
-
-                       // 5 pending files and 1 in progress file
-                       bucketState = 
bucket.onReceptionOfCheckpoint(noOfPendingCheckpoints);
-               }
-
-               final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer(), 
bucketState);
-
-               Files.write(getSnapshotPath(scenarioName, CURRENT_VERSION), 
bytes);
-
-               // copy the scenario file to a template directory.
-               // it is because that the test `testSerializationFull` would 
change the in progress file to pending files.
-               moveToTemplateDirectory(scenarioPath);
-       }
-
        private void testDeserializationFull(final boolean withInProgress, 
final String scenarioName) throws IOException {
 
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, previousVersion);
+
                try {
-                       final java.nio.file.Path outputPath = 
getOutputPath(scenarioName, previousVersion);
+                       final java.nio.file.Path outputPath = 
pathResolver.getOutputPath(scenarioName);
                        final Path testBucketPath = new 
Path(outputPath.resolve(BUCKET_ID).toString());
                        // restore the state
                        final BucketState<String> recoveredState = 
readBucketStateFromTemplate(scenarioName, previousVersion);
@@ -306,22 +224,10 @@ public class BucketStateSerializerTest {
                                assertThat(afterRestorePaths, empty());
                        }
                } finally {
-                       FileUtils.deleteDirectory(getResourcePath(scenarioName, 
previousVersion).toFile());
+                       
FileUtils.deleteDirectory(pathResolver.getResourcePath(scenarioName).toFile());
                }
        }
 
-       private static Bucket<String, String> createNewBucket(final Path 
bucketPath) throws IOException {
-               return Bucket.getNew(
-                       0,
-                       BUCKET_ID,
-                       bucketPath,
-                       0,
-                       createBucketWriter(),
-                       
DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
-                       null,
-                       OutputFileConfig.builder().build());
-       }
-
        private static Bucket<String, String> restoreBucket(final int 
initialPartCounter, final BucketState<String> bucketState) throws IOException {
                return Bucket.restore(
                        0,
@@ -338,7 +244,7 @@ public class BucketStateSerializerTest {
        }
 
        private static SimpleVersionedSerializer<BucketState<String>> 
bucketStateSerializer() throws IOException {
-               final RowWiseBucketWriter bucketWriter = createBucketWriter();
+               final RowWiseBucketWriter<String, String> bucketWriter = 
createBucketWriter();
                return new BucketStateSerializer<>(
                        
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
                        
bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
@@ -346,12 +252,14 @@ public class BucketStateSerializerTest {
        }
 
        private static BucketState<String> readBucketState(final String 
scenarioName, final int version) throws IOException {
-               byte[] bytes = Files.readAllBytes(getSnapshotPath(scenarioName, 
version));
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, version);
+               byte[] bytes = 
Files.readAllBytes(pathResolver.getSnapshotPath(scenarioName));
                return 
SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), 
bytes);
        }
 
        private static BucketState<String> readBucketStateFromTemplate(final 
String scenarioName, final int version) throws IOException {
-               final java.nio.file.Path scenarioPath =  
getResourcePath(scenarioName, version);
+               final BucketStatePathResolver pathResolver = new 
BucketStatePathResolver(BASE_PATH, version);
+               final java.nio.file.Path scenarioPath = 
pathResolver.getResourcePath(scenarioName);
 
                // clear the scenario files first
                FileUtils.deleteDirectory(scenarioPath.toFile());
@@ -361,9 +269,4 @@ public class BucketStateSerializerTest {
 
                return readBucketState(scenarioName, version);
        }
-
-       private static void moveToTemplateDirectory(java.nio.file.Path 
scenarioPath) throws IOException {
-               FileUtils.copy(new Path(scenarioPath.toString()), new 
Path(scenarioPath.toString() + "-template"), false);
-               FileUtils.deleteDirectory(scenarioPath.toFile());
-       }
 }

Reply via email to