This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0528a21f7768c5c653bcbe03437c7ab9681dcf86
Author: Yun Gao <[email protected]>
AuthorDate: Thu Nov 26 17:46:59 2020 +0800

    [refactor] Factor common testing code out of FileSinkITBase
    
    We want to reuse this code for the new FileSink migration test as well.
---
 flink-connectors/flink-connector-files/pom.xml     |  15 +++
 .../flink/connector/file/sink/FileSinkITBase.java  |  93 +-------------
 .../sink/utils/IntegerFileSinkTestDataUtils.java   | 140 +++++++++++++++++++++
 3 files changed, 159 insertions(+), 89 deletions(-)

diff --git a/flink-connectors/flink-connector-files/pom.xml 
b/flink-connectors/flink-connector-files/pom.xml
index 117e2c5..65a9027 100644
--- a/flink-connectors/flink-connector-files/pom.xml
+++ b/flink-connectors/flink-connector-files/pom.xml
@@ -92,4 +92,19 @@ under the License.
 
        </dependencies>
 
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
 </project>
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
index 6cd09d6..4912f2c 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
@@ -18,17 +18,14 @@
 
 package org.apache.flink.connector.file.sink;
 
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
 import org.apache.flink.util.TestLogger;
 
@@ -37,22 +34,9 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runners.Parameterized;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * The base class for the File Sink IT Case in different execution mode.
@@ -101,88 +85,19 @@ public abstract class FileSinkITBase extends TestLogger {
                        miniCluster.executeJobBlocking(jobGraph);
                }
 
-               checkResult(path);
+               
IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(path, NUM_RECORDS, 
NUM_BUCKETS, NUM_SOURCES);
        }
 
        protected abstract JobGraph createJobGraph(String path);
 
        protected FileSink<Integer> createFileSink(String path) {
                return FileSink
-                               .forRowFormat(new Path(path), new IntEncoder())
-                               .withBucketAssigner(new ModuloBucketAssigner())
+                               .forRowFormat(new Path(path), new 
IntegerFileSinkTestDataUtils.IntEncoder())
+                               .withBucketAssigner(new 
IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
                                .withRollingPolicy(new 
PartSizeAndCheckpointRollingPolicy(1024))
                                .build();
        }
 
-       private void checkResult(String path) throws Exception {
-               File dir = new File(path);
-               String[] subDirNames = dir.list();
-               assertNotNull(subDirNames);
-
-               Arrays.sort(subDirNames, 
Comparator.comparingInt(Integer::parseInt));
-               assertEquals(NUM_BUCKETS, subDirNames.length);
-               for (int i = 0; i < NUM_BUCKETS; ++i) {
-                       assertEquals(Integer.toString(i), subDirNames[i]);
-
-                       // now check its content
-                       File bucketDir = new File(path, subDirNames[i]);
-                       assertTrue(
-                                       bucketDir.getAbsolutePath() + " Should 
be a existing directory",
-                                       bucketDir.isDirectory());
-
-                       Map<Integer, Integer> counts = new HashMap<>();
-                       File[] files = bucketDir.listFiles(f -> 
!f.getName().startsWith("."));
-                       assertNotNull(files);
-
-                       for (File file : files) {
-                               assertTrue(file.isFile());
-
-                               try (DataInputStream dataInputStream = new 
DataInputStream(new FileInputStream(file))) {
-                                       while (true) {
-                                               int value = 
dataInputStream.readInt();
-                                               counts.compute(value, (k, v) -> 
v == null ? 1 : v + 1);
-                                       }
-                               } catch (EOFException e) {
-                                       // End the reading
-                               }
-                       }
-
-                       int expectedCount = NUM_RECORDS / NUM_BUCKETS +
-                                       (i < NUM_RECORDS % NUM_BUCKETS ? 1 : 0);
-                       assertEquals(expectedCount, counts.size());
-
-                       for (int j = i; j < NUM_RECORDS; j += NUM_BUCKETS) {
-                               assertEquals(
-                                               "The record " + j + " should 
occur " + NUM_SOURCES + " times, " +
-                                                               " but only 
occurs " + counts.getOrDefault(j, 0) + "time",
-                                               NUM_SOURCES,
-                                               counts.getOrDefault(j, 
0).intValue());
-                       }
-               }
-       }
-
-       private static class IntEncoder implements Encoder<Integer> {
-
-               @Override
-               public void encode(Integer element, OutputStream stream) throws 
IOException {
-                       
stream.write(ByteBuffer.allocate(4).putInt(element).array());
-                       stream.flush();
-               }
-       }
-
-       private static class ModuloBucketAssigner implements 
BucketAssigner<Integer, String> {
-
-               @Override
-               public String getBucketId(Integer element, Context context) {
-                       return Integer.toString(element % NUM_BUCKETS);
-               }
-
-               @Override
-               public SimpleVersionedSerializer<String> getSerializer() {
-                       return SimpleVersionedStringSerializer.INSTANCE;
-               }
-       }
-
        private static class PartSizeAndCheckpointRollingPolicy
                        extends CheckpointRollingPolicy<Integer, String> {
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java
new file mode 100644
index 0000000..a0dab46
--- /dev/null
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.utils;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utilities for file sinks that writes a sequence of continues integers into 
files starting from 0.
+ * The sinks expect multiple sources writing the same sequence onto the disk 
and the integers are
+ * assigned to different buckets according to modulo.
+ */
+public class IntegerFileSinkTestDataUtils {
+
+       /**
+        * Testing sink {@link Encoder} that writes integer with its binary 
representation.
+        */
+       public static class IntEncoder implements Encoder<Integer> {
+
+               @Override
+               public void encode(Integer element, OutputStream stream) throws 
IOException {
+                       
stream.write(ByteBuffer.allocate(4).putInt(element).array());
+                       stream.flush();
+               }
+       }
+
+       /**
+        * Testing {@link BucketAssigner} that assigns integers according to 
modulo.
+        */
+       public static class ModuloBucketAssigner implements 
BucketAssigner<Integer, String> {
+
+               private final int numBuckets;
+
+               public ModuloBucketAssigner(int numBuckets) {
+                       this.numBuckets = numBuckets;
+               }
+
+               @Override
+               public String getBucketId(Integer element, Context context) {
+                       return Integer.toString(element % numBuckets);
+               }
+
+               @Override
+               public SimpleVersionedSerializer<String> getSerializer() {
+                       return SimpleVersionedStringSerializer.INSTANCE;
+               }
+       }
+
+       /**
+        * Verifies the files written by the sink contains the expected integer 
sequences.
+        * The integers are partition into different buckets according to 
module, and each
+        * integer will be repeated by <tt>numSources</tt> times.
+        *
+        * @param path The directory to check.
+        * @param numRecords The total number of records.
+        * @param numBuckets The number of buckets to assign.
+        * @param numSources The parallelism of sources generating the 
sequences. Each integer will be
+        *                   repeat for <tt>numSources</tt> times.
+        */
+       public static void checkIntegerSequenceSinkOutput(String path, int 
numRecords, int numBuckets, int numSources) throws Exception {
+               File dir = new File(path);
+               String[] subDirNames = dir.list();
+               assertNotNull(subDirNames);
+
+               Arrays.sort(subDirNames, 
Comparator.comparingInt(Integer::parseInt));
+               assertEquals(numBuckets, subDirNames.length);
+               for (int i = 0; i < numBuckets; ++i) {
+                       assertEquals(Integer.toString(i), subDirNames[i]);
+
+                       // now check its content
+                       File bucketDir = new File(path, subDirNames[i]);
+                       assertTrue(
+                                       bucketDir.getAbsolutePath() + " Should 
be a existing directory",
+                                       bucketDir.isDirectory());
+
+                       Map<Integer, Integer> counts = new HashMap<>();
+                       File[] files = bucketDir.listFiles(f -> 
!f.getName().startsWith("."));
+                       assertNotNull(files);
+
+                       for (File file : files) {
+                               assertTrue(file.isFile());
+
+                               try (DataInputStream dataInputStream = new 
DataInputStream(new FileInputStream(file))) {
+                                       while (true) {
+                                               int value = 
dataInputStream.readInt();
+                                               counts.compute(value, (k, v) -> 
v == null ? 1 : v + 1);
+                                       }
+                               } catch (EOFException e) {
+                                       // End the reading
+                               }
+                       }
+
+                       int expectedCount = numRecords / numBuckets +
+                                       (i < numRecords % numBuckets ? 1 : 0);
+                       assertEquals(expectedCount, counts.size());
+
+                       for (int j = i; j < numRecords; j += numBuckets) {
+                               assertEquals(
+                                               "The record " + j + " should 
occur " + numSources + " times, " +
+                                                               " but only 
occurs " + counts.getOrDefault(j, 0) + "time",
+                                               numSources,
+                                               counts.getOrDefault(j, 
0).intValue());
+                       }
+               }
+       }
+}

Reply via email to