Repository: apex-core Updated Branches: refs/heads/master 1b1813f96 -> d32ea3c04
APEXCORE-222 purging of the buffer server is done from the streaming container, instead of StreamingContainerManager Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d32ea3c0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d32ea3c0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d32ea3c0 Branch: refs/heads/master Commit: d32ea3c04498ed473364140ce444b6fe6e732149 Parents: 1b1813f Author: sandeshh <[email protected]> Authored: Mon Jun 27 20:58:07 2016 -0700 Committer: sandeshh <[email protected]> Committed: Fri Jul 15 10:26:35 2016 -0700 ---------------------------------------------------------------------- .../bufferserver/internal/DataList.java | 11 ++++----- .../datatorrent/bufferserver/server/Server.java | 12 ++++++--- .../stram/StreamingContainerManager.java | 26 -------------------- .../stram/engine/StreamingContainer.java | 8 ++++++ 4 files changed, 22 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 2a01102..3f596d9 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -176,21 +176,20 @@ public class DataList numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1); } - public void purge(final int baseSeconds, final int windowId) + public void purge(final long windowId) { - final long longWindowId = (long)baseSeconds << 32 | windowId; logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), - Codec.getStringWindowId(longWindowId)); + Codec.getStringWindowId(windowId)); int numberOfInMemBlockPurged = 0; synchronized (this) { - for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; + for (Block prev = null, temp = first; temp != null && temp.starting_window <= windowId; prev = temp, temp = temp.next) { - if (temp.ending_window > longWindowId || temp == last) { + if (temp.ending_window > windowId || temp == last) { if (prev != null) { first = temp; } - first.purge(longWindowId); + first.purge(windowId); break; } temp.discard(false); http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 83b50d2..12eed5f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -26,7 +26,6 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; @@ -169,7 +168,7 @@ public class Server implements ServerListener return identity; } - private final HashMap<String, DataList> publisherBuffers = new HashMap<String, DataList>(); + private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>(); private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>(); @@ -185,7 +184,7 @@ public class Server implements ServerListener if (dl == null) { message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes(); } else { - dl.purge(request.getBaseSeconds(), request.getWindowId()); + dl.purge((long)request.getBaseSeconds() << 32 | request.getWindowId()); message = ("Request sent for processing: " + request).getBytes(); } @@ -199,6 +198,13 @@ public class Server implements ServerListener } } + public void purge(long windowId) + { + for (DataList dataList: publisherBuffers.values()) { + dataList.purge(windowId); + } + } + private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException { DataList dl; http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 092f5a2..d5e5475 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2171,32 +2171,6 @@ public class StreamingContainerManager implements PlanContext }; poolExecutor.submit(r); } - // delete stream state when using buffer server - for (PTOperator.PTOutput out : operator.getOutputs()) { - if (!out.isDownStreamInline()) { - if (operator.getContainer().bufferServerAddress == null) { - // address should be null only for a new container, in which case there should not be a purge request - // TODO: logging added to find out how we got here - LOG.warn("purge request w/o buffer server address source {} container {} checkpoints {}", - out, operator.getContainer(), operator.checkpoints); - continue; - } - - for (InputPortMeta ipm : out.logicalStream.getSinks()) { - StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm); - Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo); - // following needs to match the concat logic in StreamingContainer - String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString()); - // delete everything from buffer server prior to new checkpoint - BufferServerController bsc = getBufferServerClient(operator); - try { - bsc.purge(null, sourceIdentifier, operator.checkpoints.getFirst().windowId - 1); - } catch (RuntimeException re) { - LOG.warn("Failed to purge {} {}", bsc.addr, sourceIdentifier, re); - } - } - } - } } purgeCheckpoints.clear(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 1953d7a..54b8a6e 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -769,7 +769,15 @@ public class StreamingContainer extends YarnContainerMain } if (rsp.committedWindowId != lastCommittedWindowId) { + lastCommittedWindowId = rsp.committedWindowId; + + if (bufferServer != null) { + // One Window before the committed Window is kept in the Buffer Server, for historical reasons. + // Jira for that issue is APEXCORE-479 + bufferServer.purge(lastCommittedWindowId - 1); + } + OperatorRequest nr = null; for (Entry<Integer, Node<?>> e : nodes.entrySet()) { final Thread thread = e.getValue().context.getThread();
