Repository: apex-core Updated Branches: refs/heads/master 91effc979 -> 33812f657
APEXCORE-745 Buffer server may stop processing tuples when backpressure is enabled Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/33812f65 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/33812f65 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/33812f65 Branch: refs/heads/master Commit: 33812f65712f9965d7e6140d5da638fafb24cc8e Parents: 91effc9 Author: Vlad Rozov <[email protected]> Authored: Thu Jun 8 14:08:48 2017 -0700 Committer: Vlad Rozov <[email protected]> Committed: Fri Jun 30 14:37:04 2017 -0700 ---------------------------------------------------------------------- .../bufferserver/internal/DataList.java | 38 +++++++++++++++----- .../bufferserver/internal/LogicalNode.java | 2 +- .../datatorrent/bufferserver/server/Server.java | 1 - 3 files changed, 30 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/33812f65/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 5813b56..69efc04 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -32,6 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.common.util.ToStringStyle; +import org.apache.commons.lang.builder.ToStringBuilder; + import com.datatorrent.bufferserver.packet.BeginWindowTuple; import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.bufferserver.packet.ResetWindowTuple; @@ -55,6 +58,8 @@ import static com.google.common.collect.Sets.newHashSet; */ public class DataList { + private static final Logger logger = LoggerFactory.getLogger(DataList.class); + private final int MAX_COUNT_OF_INMEM_BLOCKS; protected final String identifier; private final int blockSize; @@ -291,7 +296,12 @@ public class DataList public void notifyListeners() { - listenersNotifier.moreDataAvailable(); + try { + listenersNotifier.moreDataAvailable(); + } catch (RuntimeException e) { + logger.warn("{}", listenersNotifier, e); + } + logger.debug("{} notified", listenersNotifier); } public void setAutoFlushExecutor(final ExecutorService es) @@ -359,6 +369,7 @@ public class DataList set.add(dl); } + listenersNotifier.run(); } public void removeDataListener(DataListener dl) @@ -536,7 +547,7 @@ public class DataList @Override public String toString() { - return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {" + identifier + '}'; + return new ToStringBuilder(this, ToStringStyle.DEFAULT).append("identifier", identifier).toString(); } /** @@ -1123,7 +1134,7 @@ public class DataList final Future<?> future = this.future; if (future == null || future.isDone() || future.isCancelled()) { // Do not schedule a new task if there is an existing one that is still running or is waiting in the queue - this.future = autoFlushExecutor.submit(listenersNotifier); + this.future = autoFlushExecutor.submit(this); } else { synchronized (this) { if (this.future == null) { @@ -1143,7 +1154,7 @@ public class DataList try { doesAtLeastOneListenerHaveDataToSend |= dl.addedData(false); } catch (RuntimeException e) { - logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e); + logger.warn("{} removing {} due to exception", this, dl, e); removeDataListener(dl); break; } @@ -1159,7 +1170,7 @@ public class DataList return true; } } catch (RuntimeException e) { - logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e); + logger.warn("{} removing {} due to exception", this, dl, e); removeDataListener(dl); return checkIfListenersHaveDataToSendOnly(); } @@ -1170,6 +1181,7 @@ public class DataList @Override public void run() { + logger.debug("{} entered run", this); try { if (addedData() || checkIfListenersHaveDataToSendOnly()) { future = autoFlushExecutor.submit(this); @@ -1183,11 +1195,19 @@ public class DataList } } } - } catch (Exception e) { - logger.error("{}", DataList.this, e); + } catch (RuntimeException e) { + logger.warn("{}", this, e); + } finally { + logger.debug("{} exiting run", this); } } - } - private static final Logger logger = LoggerFactory.getLogger(DataList.class); + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.DEFAULT).append(DataList.this) + .append("future", future == null ? null : future.getClass().getSimpleName() + '@' + Integer.toHexString(System.identityHashCode(future))) + .append("isMoreDataAvailable", isMoreDataAvailable).toString(); + } + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/33812f65/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index b06e60a..3e8846d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -151,7 +151,7 @@ public class LogicalNode implements DataListener /** * */ - public void catchUp() + private void catchUp() { caughtup = false; if (isReady()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/33812f65/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 857e51e..c5700f2 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -648,7 +648,6 @@ public class Server extends AbstractServer { final DataList dl = publisherBuffers.get(ln.getUpstream()); if (dl != null) { - ln.catchUp(); dl.addDataListener(ln); } else { logger.error("Disconnecting {} with no matching data list.", this);
