Repository: apex-core
Updated Branches:
  refs/heads/master 1b1813f96 -> d32ea3c04


APEXCORE-222 purging of the buffer server is done from the streaming container, 
instead of StreamingContainerManager


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d32ea3c0
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d32ea3c0
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d32ea3c0

Branch: refs/heads/master
Commit: d32ea3c04498ed473364140ce444b6fe6e732149
Parents: 1b1813f
Author: sandeshh <[email protected]>
Authored: Mon Jun 27 20:58:07 2016 -0700
Committer: sandeshh <[email protected]>
Committed: Fri Jul 15 10:26:35 2016 -0700

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 11 ++++-----
 .../datatorrent/bufferserver/server/Server.java | 12 ++++++---
 .../stram/StreamingContainerManager.java        | 26 --------------------
 .../stram/engine/StreamingContainer.java        |  8 ++++++
 4 files changed, 22 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 2a01102..3f596d9 100644
--- 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -176,21 +176,20 @@ public class DataList
     numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1);
   }
 
-  public void purge(final int baseSeconds, final int windowId)
+  public void purge(final long windowId)
   {
-    final long longWindowId = (long)baseSeconds << 32 | windowId;
     logger.debug("Purging {} from window ID {} to window ID {}", this, 
Codec.getStringWindowId(first.starting_window),
-        Codec.getStringWindowId(longWindowId));
+        Codec.getStringWindowId(windowId));
 
     int numberOfInMemBlockPurged = 0;
     synchronized (this) {
-      for (Block prev = null, temp = first; temp != null && 
temp.starting_window <= longWindowId;
+      for (Block prev = null, temp = first; temp != null && 
temp.starting_window <= windowId;
           prev = temp, temp = temp.next) {
-        if (temp.ending_window > longWindowId || temp == last) {
+        if (temp.ending_window > windowId || temp == last) {
           if (prev != null) {
             first = temp;
           }
-          first.purge(longWindowId);
+          first.purge(windowId);
           break;
         }
         temp.discard(false);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 83b50d2..12eed5f 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -26,7 +26,6 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -169,7 +168,7 @@ public class Server implements ServerListener
     return identity;
   }
 
-  private final HashMap<String, DataList> publisherBuffers = new 
HashMap<String, DataList>();
+  private final ConcurrentHashMap<String, DataList> publisherBuffers = new 
ConcurrentHashMap<>(1, 0.75f, 1);
   private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new 
ConcurrentHashMap<String, LogicalNode>();
   private final ConcurrentHashMap<String, AbstractLengthPrependerClient> 
publisherChannels = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, AbstractLengthPrependerClient> 
subscriberChannels = new ConcurrentHashMap<>();
@@ -185,7 +184,7 @@ public class Server implements ServerListener
     if (dl == null) {
       message = ("Invalid identifier '" + request.getIdentifier() + 
"'").getBytes();
     } else {
-      dl.purge(request.getBaseSeconds(), request.getWindowId());
+      dl.purge((long)request.getBaseSeconds() << 32 | request.getWindowId());
       message = ("Request sent for processing: " + request).getBytes();
     }
 
@@ -199,6 +198,13 @@ public class Server implements ServerListener
     }
   }
 
+  public void purge(long windowId)
+  {
+    for (DataList dataList: publisherBuffers.values()) {
+      dataList.purge(windowId);
+    }
+  }
+
   private void handleResetRequest(ResetRequestTuple request, final 
AbstractLengthPrependerClient ctx) throws IOException
   {
     DataList dl;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 092f5a2..d5e5475 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2171,32 +2171,6 @@ public class StreamingContainerManager implements 
PlanContext
         };
         poolExecutor.submit(r);
       }
-      // delete stream state when using buffer server
-      for (PTOperator.PTOutput out : operator.getOutputs()) {
-        if (!out.isDownStreamInline()) {
-          if (operator.getContainer().bufferServerAddress == null) {
-            // address should be null only for a new container, in which case 
there should not be a purge request
-            // TODO: logging added to find out how we got here
-            LOG.warn("purge request w/o buffer server address source {} 
container {} checkpoints {}",
-                out, operator.getContainer(), operator.checkpoints);
-            continue;
-          }
-
-          for (InputPortMeta ipm : out.logicalStream.getSinks()) {
-            StreamCodec<?> streamCodecInfo = 
StreamingContainerAgent.getStreamCodec(ipm);
-            Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
-            // following needs to match the concat logic in StreamingContainer
-            String sourceIdentifier = 
Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
-            // delete everything from buffer server prior to new checkpoint
-            BufferServerController bsc = getBufferServerClient(operator);
-            try {
-              bsc.purge(null, sourceIdentifier, 
operator.checkpoints.getFirst().windowId - 1);
-            } catch (RuntimeException re) {
-              LOG.warn("Failed to purge {} {}", bsc.addr, sourceIdentifier, 
re);
-            }
-          }
-        }
-      }
     }
     purgeCheckpoints.clear();
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java 
b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 1953d7a..54b8a6e 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -769,7 +769,15 @@ public class StreamingContainer extends YarnContainerMain
     }
 
     if (rsp.committedWindowId != lastCommittedWindowId) {
+
       lastCommittedWindowId = rsp.committedWindowId;
+
+      if (bufferServer != null) {
+        //  One Window before the committed Window is kept in the Buffer 
Server, for historical reasons.
+        // Jira for that issue is APEXCORE-479
+        bufferServer.purge(lastCommittedWindowId - 1);
+      }
+
       OperatorRequest nr = null;
       for (Entry<Integer, Node<?>> e : nodes.entrySet()) {
         final Thread thread = e.getValue().context.getThread();

Reply via email to