Repository: apex-malhar Updated Branches: refs/heads/master 6c42103f8 -> c5af27b1e
APEXMALHAR-2412 Provide emitTuple overriding functionality for user in kinesis Input operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c5af27b1 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c5af27b1 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c5af27b1 Branch: refs/heads/master Commit: c5af27b1ee5dc430a42c01c112e0f6bd84a5ac07 Parents: 6c42103 Author: deepak-narkhede <[email protected]> Authored: Mon Feb 20 16:52:54 2017 +0530 Committer: deepak-narkhede <[email protected]> Committed: Mon Feb 20 16:53:36 2017 +0530 ---------------------------------------------------------------------- .../contrib/kinesis/AbstractKinesisInputOperator.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5af27b1/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index 18a6399..30ceadb 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -158,6 +158,14 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, */ public abstract T getTuple(Record rc); + /** + * Any concrete class derived from AbstractKinesisInputOperator may implement this method to emit tuples to an output port. + */ + public void emitTuple(Pair<String, Record> data) + { + outputPort.emit(getTuple(data.getSecond())); + } + @Override public void partitioned(Map<Integer, Partition<AbstractKinesisInputOperator>> partitions) { @@ -465,7 +473,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getSecond(), rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getFirst()); for (Record record : records) { - outputPort.emit(getTuple(record)); + emitTuple(new Pair<String, Record>(rc.getKey(), record)); shardPosition.put(rc.getKey(), record.getSequenceNumber()); } } catch(Exception e) @@ -569,8 +577,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, Pair<String, Record> data = consumer.pollRecord(); String shardId = data.getFirst(); String recordId = data.getSecond().getSequenceNumber(); - T tuple = getTuple(data.getSecond()); - outputPort.emit(tuple); + emitTuple(data); if(!currentWindowRecoveryState.containsKey(shardId)) { currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));
