Repository: storm Updated Branches: refs/heads/1.1.x-branch a0308efd6 -> d7b7096a5
STORM-2525: Fix flaky integration tests Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d7b7096a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d7b7096a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d7b7096a Branch: refs/heads/1.1.x-branch Commit: d7b7096a53edff7bedda56c85f6ece9c9d994591 Parents: a0308ef Author: Stig Rohde Døssing <[email protected]> Authored: Sat May 20 19:46:03 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Tue Nov 14 08:04:00 2017 +0100 ---------------------------------------------------------------------- integration-test/config/Vagrantfile | 8 +- integration-test/config/cluster.xml | 101 ------------------- integration-test/config/install-storm.sh | 1 - integration-test/config/install-zookeeper.sh | 1 + integration-test/pom.xml | 5 + integration-test/run-it.sh | 7 +- .../topology/window/SlidingTimeCorrectness.java | 4 +- .../window/SlidingWindowCorrectness.java | 2 - .../window/TumblingTimeCorrectness.java | 4 +- .../window/TumblingWindowCorrectness.java | 2 - .../st/tests/window/SlidingWindowTest.java | 44 ++++---- .../org/apache/storm/st/wrapper/TopoWrap.java | 81 +++++++-------- pom.xml | 19 ++++ storm-core/pom.xml | 7 +- .../storm/topology/WindowedBoltExecutor.java | 6 +- .../apache/storm/windowing/EvictionContext.java | 2 +- .../apache/storm/windowing/EvictionPolicy.java | 2 +- .../storm/windowing/TimeEvictionPolicy.java | 2 +- .../windowing/WatermarkCountEvictionPolicy.java | 11 +- .../windowing/WatermarkTimeEvictionPolicy.java | 10 +- .../apache/storm/windowing/WindowManager.java | 3 + .../storm/windowing/WindowManagerTest.java | 48 ++++++++- 22 files changed, 184 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/config/Vagrantfile ---------------------------------------------------------------------- diff --git a/integration-test/config/Vagrantfile b/integration-test/config/Vagrantfile index 740d0b0..def461d 100644 --- a/integration-test/config/Vagrantfile +++ b/integration-test/config/Vagrantfile @@ -19,7 +19,7 @@ require 'uri' # Vagrantfile API/syntax version. Don't touch unless you know what you're doing! VAGRANTFILE_API_VERSION = "2" -STORM_BOX_TYPE = "hashicorp/precise64" +STORM_BOX_TYPE = "ubuntu/xenial64" STORM_ZIP = Dir.glob("../../storm-dist/binary/**/*.zip") if(STORM_ZIP.length != 1) raise "Expected one storm-binary found: " + STORM_ZIP.join(",") + ". Did you run : cd ${STORM_SRC_DIR}/storm-dist/binary && mvn clean package -Dgpg.skip=true" @@ -53,8 +53,8 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| } end - config.vm.synced_folder "../../", "/home/vagrant/build/vagrant/storm" - config.vm.synced_folder "~/.m2", "/home/vagrant/.m2" + config.vm.synced_folder "../../", "/home/ubuntu/build/vagrant/storm" + config.vm.synced_folder "~/.m2", "/home/ubuntu/.m2" config.vm.define "node1" do |node1| node1.vm.provider "virtualbox" do |v| @@ -62,7 +62,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| end node1.vm.network "private_network", ip: "192.168.50.3" node1.vm.hostname = "node1" - node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/vagrant/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false + node1.vm.provision :shell, :inline => "echo run integration test; whoami; env; cd /home/ubuntu/build/vagrant/storm/; pwd; bash integration-test/run-it.sh", privileged: false #node1.vm.provision :shell, :inline => "sudo ln -fs /vagrant/etc-hosts /etc/hosts" end http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/config/cluster.xml ---------------------------------------------------------------------- diff --git a/integration-test/config/cluster.xml b/integration-test/config/cluster.xml deleted file mode 100644 index 97968e4..0000000 --- a/integration-test/config/cluster.xml +++ /dev/null @@ -1,101 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration scan="true" scanPeriod="60 seconds"> - <properties> - <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property> - <property name="patternMetrics">%d %-8r %m%n</property> - </properties> - - <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>/var/log/storm/${logfile.name}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> - <fileNamePattern>/var/log/storm/${logfile.name}.%i</fileNamePattern> - <minIndex>1</minIndex> - <maxIndex>9</maxIndex> - </rollingPolicy> - - <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> - <maxFileSize>100MB</maxFileSize> - </triggeringPolicy> - - <encoder> - <pattern>${pattern}</pattern> - </encoder> - </appender> - - <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>/var/log/storm/access.log</file> - <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> - <fileNamePattern>/var/log/storm/access.log.%i</fileNamePattern> - <minIndex>1</minIndex> - <maxIndex>9</maxIndex> - </rollingPolicy> - - <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> - <maxFileSize>100MB</maxFileSize> - </triggeringPolicy> - - <encoder> - <pattern>${pattern}</pattern> - </encoder> - </appender> - - <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>/var/log/storm/metrics.log</file> - <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> - <fileNamePattern>metrics.log.%i</fileNamePattern> - <minIndex>1</minIndex> - <maxIndex>9</maxIndex> - </rollingPolicy> - - <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> - <maxFileSize>2MB</maxFileSize> - </triggeringPolicy> - - <encoder> - <pattern>${patternMetrics}</pattern> - </encoder> - </appender> - - <root level="INFO"> - <appender-ref ref="A1"/> - </root> - - <logger name="org.apache.storm.messaging.netty"> - <level value="WARN" /> - <appender-ref ref="A1" /> - </logger> - - <logger name="org.apache.storm"> - <level value="DEBUG" /> - <appender-ref ref="A1" /> - </logger> - - <logger name="org.apache.storm.security.auth.authorizer" additivity="false"> - <level value="INFO" /> - <appender-ref ref="ACCESS" /> - </logger> - - <logger name="org.apache.storm.metric.LoggingMetricsConsumer" additivity="false"> - <level value="INFO"/> - <appender-ref ref="METRICS"/> - </logger> - -</configuration> http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/config/install-storm.sh ---------------------------------------------------------------------- diff --git a/integration-test/config/install-storm.sh b/integration-test/config/install-storm.sh index b08e2bc..8316c75 100644 --- a/integration-test/config/install-storm.sh +++ b/integration-test/config/install-storm.sh @@ -31,7 +31,6 @@ chown -R storm:storm /etc/storm rm /usr/share/storm/conf/storm.yaml cp "${SCRIPT_DIR}/storm.yaml" /usr/share/storm/conf/ -cp "${SCRIPT_DIR}/cluster.xml" /usr/share/storm/logback/ ln -s /usr/share/storm/conf/storm.yaml /etc/storm/conf/storm.yaml mkdir /var/log/storm http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/config/install-zookeeper.sh ---------------------------------------------------------------------- diff --git a/integration-test/config/install-zookeeper.sh b/integration-test/config/install-zookeeper.sh index 5f92f56..98253d7 100644 --- a/integration-test/config/install-zookeeper.sh +++ b/integration-test/config/install-zookeeper.sh @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# $1 is the Zookeeper version to install apt-get --yes install zookeeper=$1 zookeeperd=$1 service zookeeper stop echo maxClientCnxns=200 >> /etc/zookeeper/conf/zoo.cfg http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 0351235..bcf688d 100755 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -88,6 +88,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.0</version> + </dependency> + <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-starter</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/run-it.sh ---------------------------------------------------------------------- diff --git a/integration-test/run-it.sh b/integration-test/run-it.sh index eaed790..927c60d 100755 --- a/integration-test/run-it.sh +++ b/integration-test/run-it.sh @@ -32,11 +32,11 @@ function list_storm_processes() { list_storm_processes || true # increasing swap space so we can run lots of workers -sudo dd if=/dev/zero of=/swapfile.img bs=8192 count=1M +sudo dd if=/dev/zero of=/swapfile.img bs=4096 count=1M sudo mkswap /swapfile.img sudo swapon /swapfile.img -if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8 +if [[ "${USER}" == "ubuntu" ]]; then # install oracle jdk8 sudo apt-get update sudo apt-get -y install python-software-properties sudo apt-add-repository -y ppa:webupd8team/java @@ -44,12 +44,13 @@ if [[ "${USER}" == "vagrant" ]]; then # install oracle jdk8 echo "oracle-java8-installer shared/accepted-oracle-license-v1-1 select true" | sudo debconf-set-selections sudo apt-get install -y oracle-java8-installer sudo apt-get -y install maven + sudo apt-get install unzip java -version mvn --version export MAVEN_OPTS="-Xmx3000m" + zookeeper_version=3.4.8* else ( while true; do echo "heartbeat"; sleep 300; done ) & #heartbeat needed by travis ci - (cd "${STORM_SRC_DIR}" && mvn clean install -DskipTests=true) || die "maven install command failed" if [[ "${USER}" == "travis" ]]; then ( cd "${STORM_SRC_DIR}/storm-dist/binary" && mvn clean package -Dgpg.skip=true ) fi http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java index 430449b..8e9c6c9 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingTimeCorrectness.java @@ -114,7 +114,9 @@ public class SlidingTimeCorrectness implements TestableTopology { @Override public void nextTuple() { - TimeUtil.sleepMilliSec(rng.nextInt(800)); + //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it + //Sleep a bit between emits + TimeUtil.sleepMilliSec(rng.nextInt(100)); currentNum++; TimeData data = TimeData.newData(currentNum); final Values tuple = data.getValues(); http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java index 33ee004..c059035 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/SlidingWindowCorrectness.java @@ -39,7 +39,6 @@ import org.apache.storm.st.utils.TimeUtil; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.TimeUnit; /** * Computes sliding window sum @@ -74,7 +73,6 @@ public class SlidingWindowCorrectness implements TestableTopology { builder.setSpout(getSpoutName(), new IncrementingSpout(), 1); builder.setBolt(getBoltName(), new VerificationBolt() - .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) .withWindow(new BaseWindowedBolt.Count(windowSize), new BaseWindowedBolt.Count(slideSize)), 1) .shuffleGrouping(getSpoutName()); http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java index 7cadd18..0c9e891 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingTimeCorrectness.java @@ -111,7 +111,9 @@ public class TumblingTimeCorrectness implements TestableTopology { @Override public void nextTuple() { - TimeUtil.sleepMilliSec(rng.nextInt(800)); + //Emitting too quickly can lead to spurious test failures because the worker log may roll right before we read it + //Sleep a bit between emits + TimeUtil.sleepMilliSec(rng.nextInt(100)); currentNum++; TimeData data = TimeData.newData(currentNum); final Values tuple = data.getValues(); http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java ---------------------------------------------------------------------- diff --git a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java index adcb9dd..b7f5dfa 100644 --- a/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java +++ b/integration-test/src/main/java/org/apache/storm/st/topology/window/TumblingWindowCorrectness.java @@ -39,7 +39,6 @@ import org.apache.storm.st.utils.TimeUtil; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.TimeUnit; /** * Computes sliding window sum @@ -72,7 +71,6 @@ public class TumblingWindowCorrectness implements TestableTopology { builder.setSpout(getSpoutName(), new IncrementingSpout(), 1); builder.setBolt(getBoltName(), new VerificationBolt() - .withLag(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)) .withTumblingWindow(new BaseWindowedBolt.Count(tumbleSize)), 1) .shuffleGrouping(getSpoutName()); return builder.createTopology(); http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java index f23c6fc..ebb844f 100644 --- a/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java +++ b/integration-test/src/test/java/org/apache/storm/st/tests/window/SlidingWindowTest.java @@ -17,6 +17,7 @@ package org.apache.storm.st.tests.window; +import java.io.IOException; import org.apache.storm.st.helper.AbstractTest; import org.apache.storm.st.wrapper.LogData; import org.apache.storm.st.wrapper.TopoWrap; @@ -79,21 +80,25 @@ public final class SlidingWindowTest extends AbstractTest { runAndVerifyCount(windowSize, slideSize, testable, topo); } - static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws TException, MalformedURLException { + static void runAndVerifyCount(int windowSize, int slideSize, TestableTopology testable, TopoWrap topo) throws IOException, TException, MalformedURLException { topo.submitSuccessfully(); - final int minSpoutEmits = 1000 + windowSize; final int minBoltEmits = 5; + //Sliding windows should produce one window every slideSize tuples + //Wait for the spout to emit at least enough tuples to get minBoltEmit windows and at least one full window + final int minSpoutEmits = Math.max(windowSize, minBoltEmits * slideSize); + String boltName = testable.getBoltName(); String spoutName = testable.getSpoutName(); - topo.waitForProgress(minSpoutEmits, spoutName, 180); - topo.waitForProgress(minBoltEmits, boltName, 180); + //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway + topo.assertProgress(minSpoutEmits, spoutName, 180); + topo.assertProgress(minBoltEmits, boltName, 180); List<TopoWrap.ExecutorURL> boltUrls = topo.getLogUrls(boltName); log.info(boltUrls.toString()); final List<LogData> allBoltData = topo.getLogData(boltName); final List<LogData> allSpoutData = topo.getLogData(spoutName); Assert.assertTrue(allBoltData.size() >= minBoltEmits, "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData); - final int numberOfWindows = allBoltData.size() - windowSize / slideSize; + final int numberOfWindows = allBoltData.size(); for(int i = 0; i < numberOfWindows; ++i ) { log.info("Comparing window: " + (i + 1) + " of " + numberOfWindows); final int toIndex = (i + 1) * slideSize; @@ -143,28 +148,29 @@ public final class SlidingWindowTest extends AbstractTest { runAndVerifyTime(windowSec, slideSec, testable, topo); } - static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws TException, java.net.MalformedURLException { + static void runAndVerifyTime(int windowSec, int slideSec, TestableTopology testable, TopoWrap topo) throws IOException, TException, java.net.MalformedURLException { topo.submitSuccessfully(); - final int minSpoutEmits = 1000 + windowSec; + final int minSpoutEmits = 100; final int minBoltEmits = 5; String boltName = testable.getBoltName(); String spoutName = testable.getSpoutName(); - topo.waitForProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec)); - topo.waitForProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec)); - final List<TimeData> allSpoutData = topo.getLogData(spoutName, TimeData.CLS); - final List<LogData> allBoltLog = topo.getLogData(boltName); - final List<TimeDataWindow> allBoltData = topo.getLogData(boltName, TimeDataWindow.CLS); - Assert.assertTrue(allBoltLog.size() >= minBoltEmits, - "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltLog.size() + " \n\t" + allBoltLog); - final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutData.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec); - final int numberOfWindows = allBoltLog.size() - windowSec / slideSec; + //Waiting for spout tuples isn't strictly necessary since we also wait for bolt emits, but do it anyway + topo.assertProgress(minSpoutEmits, spoutName, 60 + 10 * (windowSec + slideSec)); + topo.assertProgress(minBoltEmits, boltName, 60 + 10 * (windowSec + slideSec)); + final List<TimeData> allSpoutDataDeserialized = topo.getLogData(spoutName, TimeData.CLS); + final List<LogData> allBoltData = topo.getLogData(boltName); + final List<TimeDataWindow> allBoltDataDeserialized = topo.deserializeLogData(allBoltData, TimeDataWindow.CLS); + Assert.assertTrue(allBoltData.size() >= minBoltEmits, + "Expecting min " + minBoltEmits + " bolt emits, found: " + allBoltData.size() + " \n\t" + allBoltData); + final DateTime firstEndTime = TimeUtil.ceil(new DateTime(allSpoutDataDeserialized.get(0).getDate()).withZone(DateTimeZone.UTC), slideSec); + final int numberOfWindows = allBoltData.size(); for(int i = 0; i < numberOfWindows; ++i ) { final DateTime toDate = firstEndTime.plusSeconds(i * slideSec); final DateTime fromDate = toDate.minusSeconds(windowSec); log.info("Comparing window: " + fromDate + " to " + toDate + " iter " + (i+1) + "/" + numberOfWindows); - final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutData,fromDate, toDate); - final LogData oneBoltLog = allBoltLog.get(i); - final TimeDataWindow actualWindow = allBoltData.get(i); + final TimeDataWindow computedWindow = TimeDataWindow.newInstance(allSpoutDataDeserialized,fromDate, toDate); + final LogData oneBoltLog = allBoltData.get(i); + final TimeDataWindow actualWindow = allBoltDataDeserialized.get(i); log.info("Actual window: " + actualWindow.getDescription()); log.info("Computed window: " + computedWindow.getDescription()); for (TimeData oneLog : computedWindow) { http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java ---------------------------------------------------------------------- diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java index 5e58637..4d5c3a8 100644 --- a/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java +++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/TopoWrap.java @@ -53,6 +53,7 @@ import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.text.NumberFormat; import java.util.ArrayList; import java.util.Arrays; @@ -65,6 +66,8 @@ import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; public class TopoWrap { private static Logger log = LoggerFactory.getLogger(TopoWrap.class); @@ -97,6 +100,9 @@ public class TopoWrap { submitConf.put("storm.zookeeper.topology.auth.scheme", "digest"); submitConf.put("topology.workers", 3); submitConf.put("topology.debug", true); + //Set the metrics sample rate to 1 to force update the executor stats every time something happens + //This is necessary because getAllTimeEmittedCount relies on the executor emit stats to be accurate + submitConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 1); return submitConf; } @@ -180,7 +186,7 @@ public class TopoWrap { Map<String, Long> allTime = emitted.get(since); if (allTime == null) return 0L; - return allTime.get("default"); + return allTime.get(Utils.DEFAULT_STREAM_ID); } }); return sum(ackCounts).longValue(); @@ -213,7 +219,7 @@ public class TopoWrap { log.info(getInfo().toString()); long emitCount = getAllTimeEmittedCount(componentName); log.info("Count for component " + componentName + " is " + emitCount); - if (emitCount > minEmits) { + if (emitCount >= minEmits) { break; } TimeUtil.sleepSec(10); @@ -280,8 +286,13 @@ public class TopoWrap { } } - public <T extends FromJson<T>> List<T> getLogData(final String componentId, final FromJson<T> cls) throws TException, MalformedURLException { + public <T extends FromJson<T>> List<T> getLogData(final String componentId, final FromJson<T> cls) + throws IOException, TException, MalformedURLException { final List<LogData> logData = getLogData(componentId); + return deserializeLogData(logData, cls); + } + + public <T extends FromJson<T>> List<T> deserializeLogData(final List<LogData> logData, final FromJson<T> cls) { final List<T> data = new ArrayList<>( Collections2.transform(logData, new Function<LogData, T>() { @Nullable @@ -294,7 +305,7 @@ public class TopoWrap { return data; } - public List<LogData> getLogData(final String componentId) throws TException, MalformedURLException { + public List<LogData> getLogData(final String componentId) throws IOException, TException, MalformedURLException { final String logs = getLogs(componentId); final String dateRegex = "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}"; Pattern pattern = Pattern.compile("(?=\\n" + dateRegex + ")"); @@ -318,48 +329,38 @@ public class TopoWrap { return sortedLogs; } - public String getLogs(final String componentId) throws TException, MalformedURLException { + public String getLogs(final String componentId) throws IOException, TException, MalformedURLException { log.info("Fetching logs for componentId = " + componentId); List<ExecutorURL> exclaim2Urls = getLogUrls(componentId); log.info("Found " + exclaim2Urls.size() + " urls: " + exclaim2Urls.toString()); - Collection<String> urlOuputs = Collections2.transform(exclaim2Urls, new Function<ExecutorURL, String>() { - @Nullable - @Override - public String apply(@Nullable ExecutorURL executorURL) { - if (executorURL == null || executorURL.getDownloadUrl() == null) { - return ""; - } - String warnMessage = "Couldn't fetch executorURL: " + executorURL; + List<String> urlContents = new ArrayList<>(); + for(ExecutorURL executorUrl : exclaim2Urls) { + if(executorUrl == null || executorUrl.getDownloadUrl() == null) { + continue; + } + log.info("Fetching: " + executorUrl); + URL downloadUrl = executorUrl.downloadUrl; + String urlContent = IOUtils.toString(downloadUrl, StandardCharsets.UTF_8); + urlContents.add(urlContent); + if (urlContent.length() < 500) { + log.info("Fetched: " + urlContent); + } else { + log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes."); + } + if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) { + final String userDir = System.getProperty("user.dir"); + final File target = new File(userDir, "target"); + final File logDir = new File(target, "logs"); + final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]); try { - log.info("Fetching: " + executorURL); - final URL downloadUrl = executorURL.downloadUrl; - final String urlContent = IOUtils.toString(downloadUrl); - if (urlContent.length() < 500) { - log.info("Fetched: " + urlContent); - } else { - log.info("Fetched: " + NumberFormat.getNumberInstance(Locale.US).format(urlContent.length()) + " bytes."); - } - if (System.getProperty("regression.downloadWorkerLogs").equalsIgnoreCase("true")) { - final String userDir = System.getProperty("user.dir"); - final File target = new File(userDir, "target"); - final File logDir = new File(target, "logs"); - final File logFile = new File(logDir, downloadUrl.getHost() + "-" + downloadUrl.getFile().split("/")[2]); - try { - FileUtils.forceMkdir(logDir); - FileUtils.write(logFile, urlContent); - } catch (Throwable throwable) { - log.info("Caught exteption: " + ExceptionUtils.getFullStackTrace(throwable)); - } - } - return urlContent; - } catch (IOException e) { - log.warn(warnMessage); + FileUtils.forceMkdir(logDir); + FileUtils.write(logFile, urlContent, StandardCharsets.UTF_8); + } catch (Throwable throwable) { + log.info("Caught exception: " + ExceptionUtils.getFullStackTrace(throwable)); } - String stars = StringUtils.repeat("*", 30); - return stars + " " + warnMessage + " " + stars; } - }); - return StringUtils.join(urlOuputs, '\n'); + } + return StringUtils.join(urlContents, '\n'); } private Number sum(Collection<? extends Number> nums) { http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index fd61e85..0fefe06 100644 --- a/pom.xml +++ b/pom.xml @@ -267,6 +267,7 @@ <junit.version>4.11</junit.version> <metrics-clojure.version>2.5.1</metrics-clojure.version> <hdrhistogram.version>2.1.7</hdrhistogram.version> + <hamcrest.version>1.3</hamcrest.version> <calcite.version>1.11.0</calcite.version> @@ -915,6 +916,24 @@ </dependency> <dependency> <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <version>${hamcrest.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <version>${hamcrest.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>${mockito.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index f20b829..40c4836 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -256,7 +256,12 @@ </dependency> <dependency> <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java index c9afc67..48bcb69 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java @@ -199,8 +199,7 @@ public class WindowedBoltExecutor implements IRichBolt { // validate validate(stormConf, windowLengthCount, windowLengthDuration, slidingIntervalCount, slidingIntervalDuration); - evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration, - manager); + evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, evictionPolicy); manager.setEvictionPolicy(evictionPolicy); @@ -251,8 +250,7 @@ public class WindowedBoltExecutor implements IRichBolt { } } - private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration, - WindowManager<Tuple> manager) { + private EvictionPolicy<Tuple> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) { if (windowLengthCount != null) { if (isTupleTs()) { return new WatermarkCountEvictionPolicy<>(windowLengthCount.value); http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java index c5a578a..7582e26 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java @@ -46,7 +46,7 @@ public interface EvictionContext { Long getSlidingInterval(); /** - * Returns the current count of events in the queue up to the reference tim + * Returns the current count of events in the queue up to the reference time * based on which count based evictions can be performed. * * @return the current count http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java index 05e4d93..e7d1e40 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java @@ -27,7 +27,7 @@ public interface EvictionPolicy<T> { /** * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked. */ - enum Action { + public enum Action { /** * expire the event and remove it from the queue */ http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java index 0ab28a1..95329ca 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java @@ -49,7 +49,7 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T> { * {@inheritDoc} */ @Override - public Action evict(Event<T> event) { + public Action evict(Event<T> event) { long now = referenceTime == null ? System.currentTimeMillis() : referenceTime; long diff = now - event.getTimestamp(); if (diff >= (windowLength + delta)) { http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java index 74240bb..676ccdb 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java @@ -19,7 +19,7 @@ package org.apache.storm.windowing; /** * An eviction policy that tracks count based on watermark ts and - * evicts events upto the watermark based on a threshold count. + * evicts events up to the watermark based on a threshold count. * * @param <T> the type of event tracked by this policy. */ @@ -30,6 +30,7 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> { */ private long referenceTime; private long processed = 0L; + private EvictionContext context; public WatermarkCountEvictionPolicy(int count) { super(count); @@ -37,6 +38,13 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> { @Override public Action evict(Event<T> event) { + if(context == null) { + //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow. + //In this case we should hold on to all the events. When the first watermark is received, the context will be set, + //and the events will be reevaluated for eviction + return Action.STOP; + } + Action action; if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) { action = super.evict(event); @@ -56,6 +64,7 @@ public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> { @Override public void setContext(EvictionContext context) { + this.context = context; referenceTime = context.getReferenceTime(); if (context.getCurrentCount() != null) { currentCount.set(context.getCurrentCount()); http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java index 53361d2..981f514 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeEvictionPolicy.java @@ -58,7 +58,15 @@ public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> { */ @Override public Action evict(Event<T> event) { - long diff = referenceTime - event.getTimestamp(); + if(evictionContext == null) { + //It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow. + //In this case we should hold on to all the events. When the first watermark is received, the context will be set, + //and the events will be reevaluated for eviction + return Action.STOP; + } + + long referenceTime = evictionContext.getReferenceTime(); + long diff = referenceTime - event.getTimestamp(); if (diff < -lag) { return Action.STOP; } else if (diff < 0) { http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java b/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java index 792509e..a734497 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WindowManager.java @@ -46,6 +46,9 @@ public class WindowManager<T> implements TriggerHandler { /** * Expire old events every EXPIRE_EVENTS_THRESHOLD to * keep the window size in check. + * + * Note that if the eviction policy is based on watermarks, events will not be evicted until a new + * watermark would cause them to be considered expired anyway, regardless of this limit */ public static final int EXPIRE_EVENTS_THRESHOLD = 100; http://git-wip-us.apache.org/repos/asf/storm/blob/d7b7096a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java index 6645566..a9874cd 100644 --- a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java +++ b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java @@ -29,9 +29,12 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.apache.storm.topology.base.BaseWindowedBolt.Count; import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -155,7 +158,48 @@ public class WindowManagerTest { // window should be compacted and events should be expired. assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents); } - + + private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy watermarkEvictionPolicy, int windowLength) throws Exception { + /** + * The watermark eviction policy must not evict tuples until the first watermark has been received. + * The policies can't make a meaningful decision prior to the first watermark, so the safe decision + * is to postpone eviction. + */ + int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD; + windowManager.setEvictionPolicy(watermarkEvictionPolicy); + WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength, windowManager, + watermarkEvictionPolicy, windowManager); + triggerPolicy.start(); + windowManager.setTriggerPolicy(triggerPolicy); + for (int i : seq(1, threshold)) { + windowManager.add(i, i); + } + assertThat("The watermark eviction policies should never evict events before the first watermark is received", listener.onExpiryEvents, is(empty())); + windowManager.add(new WaterMarkEvent<Integer>(threshold)); + // The events should be put in a window when the first watermark is received + assertEquals(seq(1, threshold), listener.onActivationEvents); + //Now add some more events and a new watermark, and check that the previous events are expired + for(int i : seq(threshold+1, threshold*2)) { + windowManager.add(i, i); + } + windowManager.add(new WaterMarkEvent<Integer>(threshold + windowLength+1)); + //All the events should be expired when the next watermark is received + assertThat("All the events should be expired after the second watermark", listener.onExpiryEvents, equalTo(seq(1, threshold))); + } + + @Test + public void testExpireThresholdWithWatermarkCountEvictionPolicy() throws Exception { + int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD; + EvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(windowLength); + testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkCountEvictionPolicy, windowLength); + } + + @Test + public void testExpireThresholdWithWatermarkTimeEvictionPolicy() throws Exception { + int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD; + EvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(windowLength); + testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkTimeEvictionPolicy, windowLength); + } @Test public void testTimeBasedWindow() throws Exception {
