removed immediateFlush = true and added synchronization for potential race conditions between supervisor download and blobstore calls
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a710bb66 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a710bb66 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a710bb66 Branch: refs/heads/master Commit: a710bb6615c16cb2787ac460b00af33267e888bd Parents: c511a7e Author: Sanket <schintap@untilservice-lm> Authored: Sun Dec 13 17:41:40 2015 -0600 Committer: Sanket <schintap@untilservice-lm> Committed: Sun Dec 13 17:41:40 2015 -0600 ---------------------------------------------------------------------- conf/defaults.yaml | 2 +- log4j2/worker.xml | 8 ++++---- .../src/jvm/backtype/storm/blobstore/BlobSynchronizer.java | 2 +- .../src/jvm/backtype/storm/blobstore/KeySequenceNumber.java | 2 +- .../src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index b7c4677..8d9e3a3 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -179,7 +179,7 @@ task.refresh.poll.secs: 10 task.credentials.poll.secs: 30 # now should be null by default -topology.backpressure.enable: true +topology.backpressure.enable: false backpressure.disruptor.high.watermark: 0.9 backpressure.disruptor.low.watermark: 0.4 http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/log4j2/worker.xml ---------------------------------------------------------------------- diff --git a/log4j2/worker.xml b/log4j2/worker.xml index df368c6..967585b 100644 --- a/log4j2/worker.xml +++ b/log4j2/worker.xml @@ -22,7 +22,7 @@ <property name="patternNoTime">%msg%n</property> </properties> <appenders> - <RollingFile name="A1" immediateFlush="false" + <RollingFile name="A1" fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}" filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz"> <PatternLayout> @@ -33,7 +33,7 @@ </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> - <RollingFile name="STDOUT" immediateFlush="false" + <RollingFile name="STDOUT" fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out" filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz"> <PatternLayout> @@ -44,7 +44,7 @@ </Policies> <DefaultRolloverStrategy max="4"/> </RollingFile> - <RollingFile name="STDERR" immediateFlush="false" + <RollingFile name="STDERR" fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err" filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz"> <PatternLayout> @@ -58,7 +58,7 @@ <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514" protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true" facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}" - messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFlush="false" immediateFail="true"/> + messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true"/> </appenders> <loggers> <root level="info"> <!-- We log everything --> http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java index abd7c86..1f20d7c 100644 --- a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java +++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.Map; -import java.util.Set;; +import java.util.Set; /** * Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java index 1cddac0..9307993 100644 --- a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java +++ b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java @@ -130,7 +130,7 @@ public class KeySequenceNumber { this.nimbusInfo = nimbusInfo; } - public int getKeySequenceNumber(Map conf) { + public synchronized int getKeySequenceNumber(Map conf) { TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>(); CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf); try { http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java index 7f075a1..b8daad2 100644 --- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java +++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java @@ -281,7 +281,7 @@ public class LocalFsBlobStore extends BlobStore { } //This additional check and download is for nimbus high availability in case you have more than one nimbus - public boolean checkForBlobOrDownload(String key) { + public synchronized boolean checkForBlobOrDownload(String key) { boolean checkBlobDownload = false; try { List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this); @@ -301,7 +301,7 @@ public class LocalFsBlobStore extends BlobStore { return checkBlobDownload; } - public void checkForBlobUpdate(String key) { + public synchronized void checkForBlobUpdate(String key) { BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo); }