Repository: apex-core Updated Branches: refs/heads/master ce74fe78e -> 2c024cd84
APEXCORE-562: RecordingsAgent returns records for offset beyond number of tuples Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/fb91a589 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/fb91a589 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/fb91a589 Branch: refs/heads/master Commit: fb91a589af1d89b73d4317f19523e78e731830b8 Parents: 81b8c92 Author: Priyanka Gugale <[email protected]> Authored: Wed Oct 19 18:02:55 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Wed Oct 19 18:02:55 2016 +0530 ---------------------------------------------------------------------- .../main/java/com/datatorrent/stram/client/RecordingsAgent.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/fb91a589/engine/src/main/java/com/datatorrent/stram/client/RecordingsAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/RecordingsAgent.java b/engine/src/main/java/com/datatorrent/stram/client/RecordingsAgent.java index aee5c59..22261ba 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/RecordingsAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/client/RecordingsAgent.java @@ -522,9 +522,10 @@ public final class RecordingsAgent extends FSPartFileAgent lastProcessPartFile = indexLine.partFile; try (BufferedReader partBr = new BufferedReader(new InputStreamReader(stramAgent.getFileSystem().open(new Path(dir, indexLine.partFile))))) { processPartFile(partBr, queryType, low, high, limit, ports, numRemainingTuples, currentTimestamp, currentWindowLow, currentOffset, info); + currentOffset += numTuples; } } - currentOffset += numTuples; + if (numRemainingTuples.longValue() <= 0 || (queryType == QueryType.TIME && currentTimestamp.longValue() > high)) { return info; }
