Repository: flink
Updated Branches:
  refs/heads/master 65ee28c34 -> 297d75c2e


[FLINK-2832] [tests] Hardens RandomSamplerTest

Increase the level of significance from p=0.01 to p=0.001 and add retry 
annotations
to random sampler tests. This should decrease the probability of failing random
sampler tests.

This closes #2076


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/297d75c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/297d75c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/297d75c2

Branch: refs/heads/master
Commit: 297d75c2e043026ccc3744d587c9ebbbd81e7d4b
Parents: e1b55f0
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Jun 6 16:19:30 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../api/java/sampling/RandomSamplerTest.java    | 77 ++++++++++++--------
 .../src/test/resources/log4j-test.properties    | 28 +++++++
 flink-java/src/test/resources/logback-test.xml  | 35 +++++++++
 3 files changed, 111 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/297d75c2/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
index 68f9154..228dd3a 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
@@ -20,14 +20,16 @@ package org.apache.flink.api.java.sampling;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Preconditions;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -50,29 +52,37 @@ import static org.junit.Assert.assertTrue;
  * @see <a 
href="https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test";>Kolmogorov 
Smirnov test</a>
  */
 public class RandomSamplerTest {
-       private final static int SOURCE_SIZE = 10000;
-       private static KolmogorovSmirnovTest ksTest;
-       private static List<Double> source;
-       private final static int DEFFAULT_PARTITION_NUMBER=10;
-       private List<Double>[] sourcePartitions = new 
List[DEFFAULT_PARTITION_NUMBER];
+
+       private static final int SOURCE_SIZE = 10000;
+
+       private static final int DEFAULT_PARTITION_NUMBER = 10;
+
+       private static final KolmogorovSmirnovTest ksTest = new 
KolmogorovSmirnovTest();
+
+       private static final List<Double> source = new 
ArrayList<Double>(SOURCE_SIZE);
+
+
+       @Rule
+       public final RetryRule retryRule = new RetryRule();
+
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       private final List<Double>[] sourcePartitions = new 
List[DEFAULT_PARTITION_NUMBER];
+
 
        @BeforeClass
        public static void init() {
                // initiate source data set.
-               source = new ArrayList<Double>(SOURCE_SIZE);
                for (int i = 0; i < SOURCE_SIZE; i++) {
                        source.add((double) i);
                }
-               
-               ksTest = new KolmogorovSmirnovTest();
        }
 
        private void initSourcePartition() {
-               for (int i=0; i<DEFFAULT_PARTITION_NUMBER; i++) {
-                       sourcePartitions[i] = new LinkedList<Double>();
+               for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) {
+                       sourcePartitions[i] = new 
ArrayList<Double>((int)Math.ceil((double)SOURCE_SIZE / 
DEFAULT_PARTITION_NUMBER));
                }
                for (int i = 0; i< SOURCE_SIZE; i++) {
-                       int index = i % DEFFAULT_PARTITION_NUMBER;
+                       int index = i % DEFAULT_PARTITION_NUMBER;
                        sourcePartitions[index].add((double)i);
                }
        }
@@ -88,6 +98,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testBernoulliSamplerFraction() {
                verifySamplerFraction(0.01, false);
                verifySamplerFraction(0.05, false);
@@ -99,6 +110,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testBernoulliSamplerDuplicateElements() {
                verifyRandomSamplerDuplicateElements(new 
BernoulliSampler<Double>(0.01));
                verifyRandomSamplerDuplicateElements(new 
BernoulliSampler<Double>(0.1));
@@ -111,6 +123,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testPoissonSamplerFraction() {
                verifySamplerFraction(0.01, true);
                verifySamplerFraction(0.05, true);
@@ -132,6 +145,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testBernoulliSamplerDistribution() {
                verifyBernoulliSampler(0.01d);
                verifyBernoulliSampler(0.05d);
@@ -140,6 +154,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testPoissonSamplerDistribution() {
                verifyPoissonSampler(0.01d);
                verifyPoissonSampler(0.05d);
@@ -148,6 +163,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testReservoirSamplerSampledSize() {
                verifySamplerFixedSampleSize(1, true);
                verifySamplerFixedSampleSize(10, true);
@@ -164,6 +180,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testReservoirSamplerSampledSize2() {
                RandomSampler<Double> sampler = new 
ReservoirSamplerWithoutReplacement<Double>(20000);
                Iterator<Double> sampled = sampler.sample(source.iterator());
@@ -171,6 +188,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testReservoirSamplerDuplicateElements() {
                verifyRandomSamplerDuplicateElements(new 
ReservoirSamplerWithoutReplacement<Double>(100));
                verifyRandomSamplerDuplicateElements(new 
ReservoirSamplerWithoutReplacement<Double>(1000));
@@ -178,6 +196,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testReservoirSamplerWithoutReplacement() {
                verifyReservoirSamplerWithoutReplacement(100, false);
                verifyReservoirSamplerWithoutReplacement(500, false);
@@ -186,6 +205,7 @@ public class RandomSamplerTest {
        }
        
        @Test
+       @RetryOnFailure(times=3)
        public void testReservoirSamplerWithReplacement() {
                verifyReservoirSamplerWithReplacement(100, false);
                verifyReservoirSamplerWithReplacement(500, false);
@@ -194,6 +214,7 @@ public class RandomSamplerTest {
        }
 
        @Test
+       @RetryOnFailure(times=3)
        public void testReservoirSamplerWithMultiSourcePartitions1() {
                initSourcePartition();
 
@@ -204,6 +225,7 @@ public class RandomSamplerTest {
        }
 
        @Test
+       @RetryOnFailure(times=3)
        public void testReservoirSamplerWithMultiSourcePartitions2() {
                initSourcePartition();
 
@@ -262,7 +284,7 @@ public class RandomSamplerTest {
                assertTrue("There should not have duplicate element for sampler 
without replacement.", list.size() == set.size());
        }
        
-       private int getSize(Iterator iterator) {
+       private int getSize(Iterator<?> iterator) {
                int size = 0;
                while (iterator.hasNext()) {
                        iterator.next();
@@ -301,7 +323,7 @@ public class RandomSamplerTest {
         * If random sampler select elements randomly from source, it would 
distributed well-proportioned on source data as well,
         * so the K-S Test result would accept the first one, while reject the 
second one.
         */
-       private void verifyRandomSamplerWithFraction(double fraction, 
RandomSampler sampler, boolean withDefaultSampler) {
+       private void verifyRandomSamplerWithFraction(double fraction, 
RandomSampler<Double> sampler, boolean withDefaultSampler) {
                double[] baseSample;
                if (withDefaultSampler) {
                        baseSample = getDefaultSampler(fraction);
@@ -318,7 +340,7 @@ public class RandomSamplerTest {
         * If random sampler select elements randomly from source, it would 
distributed well-proportioned on source data as well,
         * so the K-S Test result would accept the first one, while reject the 
second one.
         */
-       private void verifyRandomSamplerWithSampleSize(int sampleSize, 
RandomSampler sampler, boolean withDefaultSampler, boolean 
sampleWithPartitions) {
+       private void verifyRandomSamplerWithSampleSize(int sampleSize, 
RandomSampler<Double> sampler, boolean withDefaultSampler, boolean 
sampleWithPartitions) {
                double[] baseSample;
                if (withDefaultSampler) {
                        baseSample = getDefaultSampler(sampleSize);
@@ -329,11 +351,11 @@ public class RandomSamplerTest {
                verifyKSTest(sampler, baseSample, withDefaultSampler, 
sampleWithPartitions);
        }
 
-       private void verifyKSTest(RandomSampler sampler, double[] 
defaultSampler, boolean expectSuccess) {
+       private void verifyKSTest(RandomSampler<Double> sampler, double[] 
defaultSampler, boolean expectSuccess) {
                verifyKSTest(sampler, defaultSampler, expectSuccess, false);
        }
 
-       private void verifyKSTest(RandomSampler sampler, double[] 
defaultSampler, boolean expectSuccess, boolean sampleOnPartitions) {
+       private void verifyKSTest(RandomSampler<Double> sampler, double[] 
defaultSampler, boolean expectSuccess, boolean sampleOnPartitions) {
                double[] sampled = getSampledOutput(sampler, 
sampleOnPartitions);
                double pValue = ksTest.kolmogorovSmirnovStatistic(sampled, 
defaultSampler);
                double dValue = getDValue(sampled.length, 
defaultSampler.length);
@@ -345,11 +367,11 @@ public class RandomSamplerTest {
        }
        
        private double[] getSampledOutput(RandomSampler<Double> sampler, 
boolean sampleOnPartitions) {
-               Iterator<Double> sampled = null;
+               Iterator<Double> sampled;
                if (sampleOnPartitions) {
                        DistributedRandomSampler<Double> reservoirRandomSampler 
= (DistributedRandomSampler<Double>)sampler;
                        List<IntermediateSampleData<Double>> intermediateResult 
= Lists.newLinkedList();
-                       for (int i=0; i<DEFFAULT_PARTITION_NUMBER; i++) {
+                       for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) {
                                Iterator<IntermediateSampleData<Double>> 
partialIntermediateResult = 
reservoirRandomSampler.sampleInPartition(sourcePartitions[i].iterator());
                                while (partialIntermediateResult.hasNext()) {
                                        
intermediateResult.add(partialIntermediateResult.next());
@@ -363,8 +385,7 @@ public class RandomSamplerTest {
                while (sampled.hasNext()) {
                        list.add(sampled.next());
                }
-               double[] result = transferFromListToArrayWithOrder(list);
-               return result;
+               return transferFromListToArrayWithOrder(list);
        }
 
        /*
@@ -393,10 +414,9 @@ public class RandomSamplerTest {
        
        private double[] getDefaultSampler(int fixSize) {
                Preconditions.checkArgument(fixSize > 0, "Sample fraction 
should be positive.");
-               int size = fixSize;
                double step = SOURCE_SIZE / (double) fixSize;
-               double[] defaultSampler = new double[size];
-               for (int i = 0; i < size; i++) {
+               double[] defaultSampler = new double[fixSize];
+               for (int i = 0; i < fixSize; i++) {
                        defaultSampler[i] = Math.round(step * i);
                }
                
@@ -424,9 +444,8 @@ public class RandomSamplerTest {
        private double[] getWrongSampler(int fixSize) {
                Preconditions.checkArgument(fixSize > 0, "Sample size be 
positive.");
                int halfSourceSize = SOURCE_SIZE / 2;
-               int size = fixSize;
-               double[] wrongSampler = new double[size];
-               for (int i = 0; i < size; i++) {
+               double[] wrongSampler = new double[fixSize];
+               for (int i = 0; i < fixSize; i++) {
                        wrongSampler[i] = (double) i % halfSourceSize;
                }
                
@@ -434,13 +453,13 @@ public class RandomSamplerTest {
        }
        
        /*
-        * Calculate the D value of K-S test for p-value 0.01, m and n are the 
sample size
+        * Calculate the D value of K-S test for p-value 0.001, m and n are the 
sample size
         */
        private double getDValue(int m, int n) {
                Preconditions.checkArgument(m > 0, "input sample size should be 
positive.");
                Preconditions.checkArgument(n > 0, "input sample size should be 
positive.");
                double first = (double) m;
                double second = (double) n;
-               return 1.63 * Math.sqrt((first + second) / (first * second));
+               return 1.95 * Math.sqrt((first + second) / (first * second));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/297d75c2/flink-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-java/src/test/resources/log4j-test.properties 
b/flink-java/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..a0fa730
--- /dev/null
+++ b/flink-java/src/test/resources/log4j-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+

http://git-wip-us.apache.org/repos/asf/flink/blob/297d75c2/flink-java/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-java/src/test/resources/logback-test.xml 
b/flink-java/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..159f76c
--- /dev/null
+++ b/flink-java/src/test/resources/logback-test.xml
@@ -0,0 +1,35 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" 
level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" 
level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
+

Reply via email to