This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0528a21f7768c5c653bcbe03437c7ab9681dcf86 Author: Yun Gao <[email protected]> AuthorDate: Thu Nov 26 17:46:59 2020 +0800 [refactor] Factor common testing code out of FileSinkITBase We want to reuse this code for the new FileSink migration test as well. --- flink-connectors/flink-connector-files/pom.xml | 15 +++ .../flink/connector/file/sink/FileSinkITBase.java | 93 +------------- .../sink/utils/IntegerFileSinkTestDataUtils.java | 140 +++++++++++++++++++++ 3 files changed, 159 insertions(+), 89 deletions(-) diff --git a/flink-connectors/flink-connector-files/pom.xml b/flink-connectors/flink-connector-files/pom.xml index 117e2c5..65a9027 100644 --- a/flink-connectors/flink-connector-files/pom.xml +++ b/flink-connectors/flink-connector-files/pom.xml @@ -92,4 +92,19 @@ under the License. </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java index 6cd09d6..4912f2c 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java @@ -18,17 +18,14 @@ package org.apache.flink.connector.file.sink; -import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; import org.apache.flink.util.TestLogger; @@ -37,22 +34,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runners.Parameterized; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; /** * The base class for the File Sink IT Case in different execution mode. @@ -101,88 +85,19 @@ public abstract class FileSinkITBase extends TestLogger { miniCluster.executeJobBlocking(jobGraph); } - checkResult(path); + IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(path, NUM_RECORDS, NUM_BUCKETS, NUM_SOURCES); } protected abstract JobGraph createJobGraph(String path); protected FileSink<Integer> createFileSink(String path) { return FileSink - .forRowFormat(new Path(path), new IntEncoder()) - .withBucketAssigner(new ModuloBucketAssigner()) + .forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withBucketAssigner(new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024)) .build(); } - private void checkResult(String path) throws Exception { - File dir = new File(path); - String[] subDirNames = dir.list(); - assertNotNull(subDirNames); - - Arrays.sort(subDirNames, Comparator.comparingInt(Integer::parseInt)); - assertEquals(NUM_BUCKETS, subDirNames.length); - for (int i = 0; i < NUM_BUCKETS; ++i) { - assertEquals(Integer.toString(i), subDirNames[i]); - - // now check its content - File bucketDir = new File(path, subDirNames[i]); - assertTrue( - bucketDir.getAbsolutePath() + " Should be a existing directory", - bucketDir.isDirectory()); - - Map<Integer, Integer> counts = new HashMap<>(); - File[] files = bucketDir.listFiles(f -> !f.getName().startsWith(".")); - assertNotNull(files); - - for (File file : files) { - assertTrue(file.isFile()); - - try (DataInputStream dataInputStream = new DataInputStream(new FileInputStream(file))) { - while (true) { - int value = dataInputStream.readInt(); - counts.compute(value, (k, v) -> v == null ? 1 : v + 1); - } - } catch (EOFException e) { - // End the reading - } - } - - int expectedCount = NUM_RECORDS / NUM_BUCKETS + - (i < NUM_RECORDS % NUM_BUCKETS ? 1 : 0); - assertEquals(expectedCount, counts.size()); - - for (int j = i; j < NUM_RECORDS; j += NUM_BUCKETS) { - assertEquals( - "The record " + j + " should occur " + NUM_SOURCES + " times, " + - " but only occurs " + counts.getOrDefault(j, 0) + "time", - NUM_SOURCES, - counts.getOrDefault(j, 0).intValue()); - } - } - } - - private static class IntEncoder implements Encoder<Integer> { - - @Override - public void encode(Integer element, OutputStream stream) throws IOException { - stream.write(ByteBuffer.allocate(4).putInt(element).array()); - stream.flush(); - } - } - - private static class ModuloBucketAssigner implements BucketAssigner<Integer, String> { - - @Override - public String getBucketId(Integer element, Context context) { - return Integer.toString(element % NUM_BUCKETS); - } - - @Override - public SimpleVersionedSerializer<String> getSerializer() { - return SimpleVersionedStringSerializer.INSTANCE; - } - } - private static class PartSizeAndCheckpointRollingPolicy extends CheckpointRollingPolicy<Integer, String> { diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java new file mode 100644 index 0000000..a0dab46 --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.utils; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utilities for file sinks that writes a sequence of continues integers into files starting from 0. + * The sinks expect multiple sources writing the same sequence onto the disk and the integers are + * assigned to different buckets according to modulo. + */ +public class IntegerFileSinkTestDataUtils { + + /** + * Testing sink {@link Encoder} that writes integer with its binary representation. + */ + public static class IntEncoder implements Encoder<Integer> { + + @Override + public void encode(Integer element, OutputStream stream) throws IOException { + stream.write(ByteBuffer.allocate(4).putInt(element).array()); + stream.flush(); + } + } + + /** + * Testing {@link BucketAssigner} that assigns integers according to modulo. + */ + public static class ModuloBucketAssigner implements BucketAssigner<Integer, String> { + + private final int numBuckets; + + public ModuloBucketAssigner(int numBuckets) { + this.numBuckets = numBuckets; + } + + @Override + public String getBucketId(Integer element, Context context) { + return Integer.toString(element % numBuckets); + } + + @Override + public SimpleVersionedSerializer<String> getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + } + + /** + * Verifies the files written by the sink contains the expected integer sequences. + * The integers are partition into different buckets according to module, and each + * integer will be repeated by <tt>numSources</tt> times. + * + * @param path The directory to check. + * @param numRecords The total number of records. + * @param numBuckets The number of buckets to assign. + * @param numSources The parallelism of sources generating the sequences. Each integer will be + * repeat for <tt>numSources</tt> times. + */ + public static void checkIntegerSequenceSinkOutput(String path, int numRecords, int numBuckets, int numSources) throws Exception { + File dir = new File(path); + String[] subDirNames = dir.list(); + assertNotNull(subDirNames); + + Arrays.sort(subDirNames, Comparator.comparingInt(Integer::parseInt)); + assertEquals(numBuckets, subDirNames.length); + for (int i = 0; i < numBuckets; ++i) { + assertEquals(Integer.toString(i), subDirNames[i]); + + // now check its content + File bucketDir = new File(path, subDirNames[i]); + assertTrue( + bucketDir.getAbsolutePath() + " Should be a existing directory", + bucketDir.isDirectory()); + + Map<Integer, Integer> counts = new HashMap<>(); + File[] files = bucketDir.listFiles(f -> !f.getName().startsWith(".")); + assertNotNull(files); + + for (File file : files) { + assertTrue(file.isFile()); + + try (DataInputStream dataInputStream = new DataInputStream(new FileInputStream(file))) { + while (true) { + int value = dataInputStream.readInt(); + counts.compute(value, (k, v) -> v == null ? 1 : v + 1); + } + } catch (EOFException e) { + // End the reading + } + } + + int expectedCount = numRecords / numBuckets + + (i < numRecords % numBuckets ? 1 : 0); + assertEquals(expectedCount, counts.size()); + + for (int j = i; j < numRecords; j += numBuckets) { + assertEquals( + "The record " + j + " should occur " + numSources + " times, " + + " but only occurs " + counts.getOrDefault(j, 0) + "time", + numSources, + counts.getOrDefault(j, 0).intValue()); + } + } + } +}
