removed immediateFlush = true and added synchronization for potential race 
conditions between supervisor download and blobstore calls


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a710bb66
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a710bb66
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a710bb66

Branch: refs/heads/master
Commit: a710bb6615c16cb2787ac460b00af33267e888bd
Parents: c511a7e
Author: Sanket <schintap@untilservice-lm>
Authored: Sun Dec 13 17:41:40 2015 -0600
Committer: Sanket <schintap@untilservice-lm>
Committed: Sun Dec 13 17:41:40 2015 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                                           | 2 +-
 log4j2/worker.xml                                            | 8 ++++----
 .../src/jvm/backtype/storm/blobstore/BlobSynchronizer.java   | 2 +-
 .../src/jvm/backtype/storm/blobstore/KeySequenceNumber.java  | 2 +-
 .../src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java   | 4 ++--
 5 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b7c4677..8d9e3a3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -179,7 +179,7 @@ task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
 
 # now should be null by default
-topology.backpressure.enable: true
+topology.backpressure.enable: false
 backpressure.disruptor.high.watermark: 0.9
 backpressure.disruptor.low.watermark: 0.4
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/log4j2/worker.xml
----------------------------------------------------------------------
diff --git a/log4j2/worker.xml b/log4j2/worker.xml
index df368c6..967585b 100644
--- a/log4j2/worker.xml
+++ b/log4j2/worker.xml
@@ -22,7 +22,7 @@
     <property name="patternNoTime">%msg%n</property>
 </properties>
 <appenders>
-    <RollingFile name="A1" immediateFlush="false"
+    <RollingFile name="A1"
                
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
                
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
         <PatternLayout>
@@ -33,7 +33,7 @@
         </Policies>
         <DefaultRolloverStrategy max="9"/>
     </RollingFile>
-    <RollingFile name="STDOUT" immediateFlush="false"
+    <RollingFile name="STDOUT"
                
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
                
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
         <PatternLayout>
@@ -44,7 +44,7 @@
         </Policies>
         <DefaultRolloverStrategy max="4"/>
     </RollingFile>
-    <RollingFile name="STDERR" immediateFlush="false"
+    <RollingFile name="STDERR"
                
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
                
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
         <PatternLayout>
@@ -58,7 +58,7 @@
     <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" 
port="514"
         protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" 
mdcId="mdc" includeMDC="true"
         facility="LOCAL5" enterpriseNumber="18060" newLine="true" 
exceptionPattern="%rEx{full}"
-        messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" 
immediateFlush="false" immediateFail="true"/>
+        messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" 
immediateFail="true"/>
 </appenders>
 <loggers>
     <root level="info"> <!-- We log everything -->

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java 
b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
index abd7c86..1f20d7c 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
@@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;;
+import java.util.Set;
 
 /**
  * Is called periodically and updates the nimbus with blobs based on the state 
stored inside the zookeeper

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java 
b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
index 1cddac0..9307993 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
@@ -130,7 +130,7 @@ public class KeySequenceNumber {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public int getKeySequenceNumber(Map conf) {
+    public synchronized int getKeySequenceNumber(Map conf) {
         TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
         CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java 
b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index 7f075a1..b8daad2 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -281,7 +281,7 @@ public class LocalFsBlobStore extends BlobStore {
     }
 
     //This additional check and download is for nimbus high availability in 
case you have more than one nimbus
-    public boolean checkForBlobOrDownload(String key) {
+    public synchronized boolean checkForBlobOrDownload(String key) {
         boolean checkBlobDownload = false;
         try {
             List<String> keyList = 
BlobStoreUtils.getKeyListFromBlobStore(this);
@@ -301,7 +301,7 @@ public class LocalFsBlobStore extends BlobStore {
         return checkBlobDownload;
     }
 
-    public void checkForBlobUpdate(String key) {
+    public synchronized void checkForBlobUpdate(String key) {
         BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, 
nimbusInfo);
     }
 

Reply via email to