Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 687581111 -> 781daa75b


APEXCORE-374 - Block with positive reference count is found during buffer 
server purge. When LogicalNode is teared down it's iterator must be closed to 
decrement reference count of the block it points to.

Conflicts:
        
bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/781daa75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/781daa75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/781daa75

Branch: refs/heads/release-3.3
Commit: 781daa75b4de823ad24181a756eef2edc9d457d0
Parents: 6875811
Author: Vlad Rozov <[email protected]>
Authored: Mon Mar 7 18:30:12 2016 -0800
Committer: Thomas Weise <[email protected]>
Committed: Tue Mar 8 11:06:39 2016 -0800

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 57 ++++++--------------
 .../bufferserver/internal/LogicalNode.java      | 35 +++++++-----
 .../datatorrent/bufferserver/server/Server.java |  7 +--
 3 files changed, 39 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/781daa75/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 95c32b0..2a01102 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -61,7 +60,6 @@ public class DataList
   private final int blockSize;
   private final HashMap<BitVector, HashSet<DataListener>> listeners = 
newHashMap();
   protected final HashSet<DataListener> all_listeners = newHashSet();
-  protected final HashMap<String, DataListIterator> iterators = newHashMap();
   protected Block first;
   protected Block last;
   protected Storage storage;
@@ -332,43 +330,18 @@ public class DataList
     return block.next;
   }
 
-  public Iterator<SerializedData> newIterator(String identifier, long windowId)
+  public DataListIterator newIterator(long windowId)
   {
     //logger.debug("request for a new iterator {} and {}", identifier, 
windowId);
-    for (Block temp = first; temp != null; temp = temp.next) {
+    Block temp = first;
+    while (temp != last) {
       if (temp.starting_window >= windowId || temp.ending_window > windowId) {
-        DataListIterator dli = getIterator(temp);
-        iterators.put(identifier, dli);
-        //logger.debug("returning new iterator on temp = {}", temp);
-        return dli;
-      }
-    }
-
-    DataListIterator dli = getIterator(last);
-    iterators.put(identifier, dli);
-    //logger.debug("returning new iterator on last = {}", last);
-    return dli;
-  }
-
-  /**
-   * Release previous acquired iterator from this DataList
-   *
-   * @param iterator
-   * @return true if successfully released, false otherwise.
-   */
-  public boolean delIterator(Iterator<SerializedData> iterator)
-  {
-    if (iterator instanceof DataListIterator) {
-      DataListIterator dli = (DataListIterator)iterator;
-      for (Entry<String, DataListIterator> e : iterators.entrySet()) {
-        if (e.getValue() == dli) {
-          dli.close();
-          iterators.remove(e.getKey());
-          return true;
-        }
+        break;
       }
+      temp = temp.next;
     }
-    return false;
+    //logger.debug("returning new iterator on temp = {}", temp);
+    return getIterator(temp);
   }
 
   public void addDataListener(DataListener dl)
@@ -506,19 +479,21 @@ public class DataList
     int oldestBlockIndex = Integer.MAX_VALUE;
     int oldestReadOffset = Integer.MAX_VALUE;
 
-    for (Map.Entry<String, DataListIterator> entry : iterators.entrySet()) {
-      Integer index = indices.get(entry.getValue().da);
+    for (DataListener dl : all_listeners) {
+      LogicalNode logicalNode = (LogicalNode)dl;
+      DataListIterator dli = logicalNode.getIterator();
+      Integer index = indices.get(dli.da);
       if (index == null) {
         // error
         throw new RuntimeException("problemo!");
       }
       if (index < oldestBlockIndex) {
         oldestBlockIndex = index;
-        oldestReadOffset = entry.getValue().getReadOffset();
-        status.slowestConsumer = entry.getKey();
-      } else if (index == oldestBlockIndex && entry.getValue().getReadOffset() 
< oldestReadOffset) {
-        oldestReadOffset = entry.getValue().getReadOffset();
-        status.slowestConsumer = entry.getKey();
+        oldestReadOffset = dli.getReadOffset();
+        status.slowestConsumer = logicalNode.getIdentifier();
+      } else if (index == oldestBlockIndex && dli.getReadOffset() < 
oldestReadOffset) {
+        oldestReadOffset = dli.getReadOffset();
+        status.slowestConsumer = logicalNode.getIdentifier();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/781daa75/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index d37f4ae..c08cfb9 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -20,7 +20,6 @@ package com.datatorrent.bufferserver.internal;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +46,7 @@ import com.datatorrent.netlet.EventLoop;
  */
 public class LogicalNode implements DataListener
 {
+  private final String identifier;
   private final String upstream;
   private final String group;
   private final HashSet<PhysicalNode> physicalNodes;
@@ -59,25 +59,21 @@ public class LogicalNode implements DataListener
 
   /**
    *
+   * @param identifier
    * @param upstream
    * @param group
    * @param iterator
-   * @param skipUptoWindowId
+   * @param skipWindowId
    */
-  public LogicalNode(String upstream, String group, Iterator<SerializedData> 
iterator, long skipUptoWindowId)
+  public LogicalNode(String identifier, String upstream, String group, 
DataListIterator iterator, long skipWindowId)
   {
+    this.identifier = identifier;
     this.upstream = upstream;
     this.group = group;
     this.physicalNodes = new HashSet<PhysicalNode>();
     this.partitions = new HashSet<BitVector>();
-
-    if (iterator instanceof DataListIterator) {
-      this.iterator = (DataListIterator)iterator;
-    } else {
-      throw new IllegalArgumentException("iterator does not belong to 
DataListIterator class");
-    }
-
-    skipWindowId = skipUptoWindowId;
+    this.iterator = iterator;
+    this.skipWindowId = skipWindowId;
   }
 
   /**
@@ -91,13 +87,14 @@ public class LogicalNode implements DataListener
 
   /**
    *
-   * @return Iterator<SerializedData>
+   * @return DataListIterator
    */
-  public Iterator<SerializedData> getIterator()
+  public DataListIterator getIterator()
   {
     return iterator;
   }
 
+
   /**
    *
    * @param connection
@@ -335,6 +332,15 @@ public class LogicalNode implements DataListener
     return upstream;
   }
 
+  /**
+   *
+   * @return the identifier
+   */
+  public String getIdentifier()
+  {
+    return identifier;
+  }
+
   public void boot(EventLoop eventloop)
   {
     for (PhysicalNode pn : physicalNodes) {
@@ -346,7 +352,8 @@ public class LogicalNode implements DataListener
   @Override
   public String toString()
   {
-    return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", 
partitions=" + partitions +
+    return "LogicalNode@" + Integer.toHexString(hashCode()) +
+        "identifier=" + identifier + ", upstream=" + upstream + ", group=" + 
group + ", partitions=" + partitions +
         ", iterator=" + iterator + '}';
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/781daa75/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 353eb2b..8a1fac7 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -271,10 +271,7 @@ public class Server implements ServerListener
       }
 
       long skipWindowId = (long)request.getBaseSeconds() << 32 | 
request.getWindowId();
-      ln = new LogicalNode(upstream_identifier,
-                           type,
-                           dl.newIterator(identifier, skipWindowId),
-                           skipWindowId);
+      ln = new LogicalNode(identifier, upstream_identifier, type, 
dl.newIterator(skipWindowId), skipWindowId);
 
       int mask = request.getMask();
       if (mask != 0) {
@@ -584,10 +581,10 @@ public class Server implements ServerListener
           DataList dl = publisherBuffers.get(ln.getUpstream());
           if (dl != null) {
             dl.removeDataListener(ln);
-            dl.delIterator(ln.getIterator());
           }
           subscriberGroups.remove(ln.getGroup());
         }
+        ln.getIterator().close();
       }
     }
 

Reply via email to