Repository: incubator-apex-malhar Updated Branches: refs/heads/master 98495ab62 -> fd5daef2f
APEXMALHAR-1977: Add metrics to jdbc input operator Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6ac59b6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6ac59b6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6ac59b6d Branch: refs/heads/master Commit: 6ac59b6dc4d5aa6fec7d0d47d519e9bbb04f14e0 Parents: 9ba99b0 Author: Priyanka Gugale <[email protected]> Authored: Thu Jan 14 15:34:34 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Thu Jan 21 12:03:53 2016 +0530 ---------------------------------------------------------------------- .../com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ac59b6d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java index 8aea9b6..a6183c2 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; @@ -97,6 +97,9 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> protected int pageNumber; + @AutoMetric + protected long tuplesRead; + @OutputPortFieldAnnotation(schemaRequired = true) public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() { @@ -185,6 +188,7 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> public void beginWindow(long l) { windowDone = false; + tuplesRead = 0; } @Override @@ -198,6 +202,7 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> do { Object tuple = getTuple(resultSet); outputPort.emit(tuple); + tuplesRead++; } while (resultSet.next()); } else {
