Github user priyank5485 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1586#discussion_r73055551
--- Diff:
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
---
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisRecordsManager.class);
+ // object handling zk interaction
+ private transient ZKConnection zkConnection;
+ // object handling interaction with kinesis
+ private transient KinesisConnection kinesisConnection;
+ // Kinesis Spout KinesisConfig object
+ private transient final KinesisConfig kinesisConfig;
+ // Queue of records per shard fetched from kinesis and are waiting to
be emitted
+ private transient Map<String, LinkedList<Record>> toEmitPerShard = new
HashMap<>();
+ // Map of records that were fetched from kinesis as a result of
failure and are waiting to be emitted
+ private transient Map<KinesisMessageId, Record>
failedandFetchedRecords = new HashMap<>();
+ // Sequence numbers per shard that have been emitted. LinkedHashSet as
we need to remove on ack or fail. At the same time order is needed to figure
out the
+ // sequence number to commit. Logic explained in commit
+ private transient Map<String, TreeSet<BigInteger>> emittedPerShard =
new HashMap<>();
+ // sorted acked sequence numbers - needed to figure out what sequence
number can be committed
+ private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new
HashMap<>();
+ // sorted failed sequence numbers - needed to figure out what sequence
number can be committed
+ private transient Map<String, TreeSet<BigInteger>> failedPerShard =
new HashMap<>();
+ // shard iterator corresponding to position in shard for new messages
+ private transient Map<String, String> shardIteratorPerShard = new
HashMap<>();
+ // last fetched sequence number corresponding to position in shard
+ private transient Map<String, String> fetchedSequenceNumberPerShard =
new HashMap<>();
+ // shard iterator corresponding to position in shard for failed
messages
+ private transient Map<KinesisMessageId, String>
shardIteratorPerFailedMessage = new HashMap<>();
+ // timestamp to decide when to commit to zk again
+ private transient long lastCommitTime;
+ // boolean to track deactivated state
+ private transient boolean deactivated;
+
+ KinesisRecordsManager (KinesisConfig kinesisConfig) {
+ this.kinesisConfig = kinesisConfig;
+ this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+ this.kinesisConnection = new
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+ }
+
+ void initialize (int myTaskIndex, int totalTasks) {
+ deactivated = false;
+ lastCommitTime = System.currentTimeMillis();
+ kinesisConnection.initialize();
+ zkConnection.initialize();
+ List<Shard> shards =
kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
+ LOG.info("myTaskIndex is " + myTaskIndex);
+ LOG.info("totalTasks is " + totalTasks);
+ int i = myTaskIndex;
+ while (i < shards.size()) {
+ LOG.info("Shard id " + shards.get(i).getShardId() + " assigned
to task " + myTaskIndex);
+ toEmitPerShard.put(shards.get(i).getShardId(), new
LinkedList<Record>());
+ i += totalTasks;
+ }
+ initializeFetchedSequenceNumbers();
+ refreshShardIteratorsForNewRecords();
+ }
+
+ void next (SpoutOutputCollector collector) {
+ if (shouldCommit()) {
+ commit();
+ }
+ KinesisMessageId failedMessageId =
kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
+ if (failedMessageId != null) {
+ // if the retry service returns a message that is not in
failed set then ignore it. should never happen
+ BigInteger failedSequenceNumber = new
BigInteger(failedMessageId.getSequenceNumber());
+ if (failedPerShard.containsKey(failedMessageId.getShardId())
&&
failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber))
{
+ if (!failedandFetchedRecords.containsKey(failedMessageId))
{
+ fetchFailedRecords(failedMessageId);
+ }
+ if (emitFailedRecord(collector, failedMessageId)) {
+
failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
+
kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
+ return;
+ } else {
+ LOG.warn("failedMessageEmitted not called on retrier
for " + failedMessageId + ". This can happen a few times but should not happen
" +
+ "infinitely");
+ }
+ } else {
+ LOG.warn("failedPerShard does not contain " +
failedMessageId + ". This should never happen.");
+ }
+ }
+ LOG.debug("No failed record to emit for now. Hence will try to
emit new records");
+ // if maximum uncommitted records count has reached, so dont emit
any new records and return
+ if (!(getUncommittedRecordsCount() <
kinesisConfig.getMaxUncommittedRecords())) {
+ LOG.warn("maximum uncommitted records count has reached. so
not emitting any new records and returning");
+ return;
+ }
+ // early return as no shard is assigned - probably because number
of executors > number of shards
+ if (toEmitPerShard.isEmpty()) {
+ LOG.warn("No shard is assigned to this task. Hence not
emitting any tuple.");
+ return;
+ }
+
+ if (shouldFetchNewRecords()) {
+ fetchNewRecords();
+ }
+ emitNewRecord(collector);
+ }
+
+ void ack (KinesisMessageId kinesisMessageId) {
+ // for an acked message add it to acked set and remove it from
emitted and failed
+ String shardId = kinesisMessageId.getShardId();
+ BigInteger sequenceNumber = new
BigInteger(kinesisMessageId.getSequenceNumber());
+ LOG.debug("Ack received for shardId: " + shardId + "
sequenceNumber: " + sequenceNumber);
+ // if an ack is received for a message then add it to the
ackedPerShard TreeSet. TreeSet because while committing we need to figure out
what is the
+ // highest sequence number that can be committed for this shard
+ if (!ackedPerShard.containsKey(shardId)) {
+ ackedPerShard.put(shardId, new TreeSet<BigInteger>());
+ }
+ ackedPerShard.get(shardId).add(sequenceNumber);
+ // if the acked message was in emittedPerShard that means we need
to remove it from the emittedPerShard(which keeps track of in flight tuples)
+ if (emittedPerShard.containsKey(shardId)) {
+ TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+ emitted.remove(sequenceNumber);
+ }
+ // an acked message should not be in failed since if it fails and
gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive
coding.
+ // Remove it from failedPerShard anyway
+ if (failedPerShard.containsKey(shardId)) {
+ failedPerShard.get(shardId).remove(sequenceNumber);
+ }
+ // if an ack is for a message that failed once at least and was
re-emitted then the record itself will be in failedAndFetchedRecords. We use
that to
+ // determine if the FailedMessageRetryHandler needs to be told
about it and then remove the record itself to clean up memory
+ if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
+
kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
+ failedandFetchedRecords.remove(kinesisMessageId);
+ }
+ // keep committing when topology is deactivated since ack and fail
keep getting called on deactivated topology
+ if (deactivated) {
+ commit();
+ }
+ }
+
+ void fail (KinesisMessageId kinesisMessageId) {
+ String shardId = kinesisMessageId.getShardId();
+ BigInteger sequenceNumber = new
BigInteger(kinesisMessageId.getSequenceNumber());
+ LOG.debug("Fail received for shardId: " + shardId + "
sequenceNumber: " + sequenceNumber);
+ // for a failed message add it to failed set if it will be
retried, otherwise ack it; remove from emitted either way
+ if
(kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
+ if (!failedPerShard.containsKey(shardId)) {
+ failedPerShard.put(shardId, new TreeSet<BigInteger>());
+ }
+ failedPerShard.get(shardId).add(sequenceNumber);
+ TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+ emitted.remove(sequenceNumber);
+ } else {
+ ack(kinesisMessageId);
+ }
+ // keep committing when topology is deactivated since ack and fail
keep getting called on deactivated topology
+ if (deactivated) {
+ commit();
+ }
+ }
+
+ void commit () {
+ // We have three mutually disjoint treesets per shard at any given
time to keep track of what sequence number can be committed to zookeeper.
+ // emittedPerShard, ackedPerShard and failedPerShard. Any record
starts by entering emittedPerShard. On ack it moves from emittedPerShard to
+ // ackedPerShard and on fail if retry service tells us to retry
then it moves from emittedPerShard to failedPerShard. The failed records will
move from
+ // failedPerShard to emittedPerShard when the failed record is
emitted again as a retry.
+ // Logic for deciding what sequence number to commit is find the
highest sequence number from ackedPerShard called X such that there is no
sequence
+ // number Y in emittedPerShard or failedPerShard that satisfies X
> Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+ // failedPerShard is 3,7 then we can only commit 1 and not 4
because 2 is still pending and 3 has failed
+ for (String shardId: toEmitPerShard.keySet()) {
+ if (ackedPerShard.containsKey(shardId)) {
+ BigInteger commitSequenceNumberBound = null;
+ if (failedPerShard.containsKey(shardId) &&
!failedPerShard.get(shardId).isEmpty()) {
+ commitSequenceNumberBound =
failedPerShard.get(shardId).first();
+ }
+ if (emittedPerShard.containsKey(shardId) &&
!emittedPerShard.get(shardId).isEmpty()) {
+ BigInteger smallestEmittedSequenceNumber =
emittedPerShard.get(shardId).first();
+ if (commitSequenceNumberBound == null ||
(commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
+ commitSequenceNumberBound =
smallestEmittedSequenceNumber;
+ }
+ }
+ Iterator<BigInteger> ackedSequenceNumbers =
ackedPerShard.get(shardId).iterator();
+ BigInteger ackedSequenceNumberToCommit = null;
+ while (ackedSequenceNumbers.hasNext()) {
+ BigInteger ackedSequenceNumber =
ackedSequenceNumbers.next();
+ if (commitSequenceNumberBound == null ||
(commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
+ ackedSequenceNumberToCommit = ackedSequenceNumber;
+ ackedSequenceNumbers.remove();
+ } else {
+ break;
+ }
+ }
+ if (ackedSequenceNumberToCommit != null) {
+ Map<Object, Object> state = new HashMap<>();
+ state.put("committedSequenceNumber",
ackedSequenceNumberToCommit.toString());
+ LOG.debug("Committing sequence number " +
ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
+
zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
+ }
+ }
+ }
+ lastCommitTime = System.currentTimeMillis();
+ }
+
+ void activate () {
+ LOG.info("Activate called");
+ deactivated = false;
+ kinesisConnection.initialize();
+ }
+
+ void deactivate () {
+ LOG.info("Deactivate called");
+ deactivated = true;
+ commit();
+ kinesisConnection.shutdown();
+ }
+
+ void close () {
+ commit();
+ kinesisConnection.shutdown();
+ zkConnection.shutdown();
+ }
+
+ // fetch records from kinesis starting at sequence number for message
passed as argument. Any other messages fetched and are in the failed queue will
also
+ // be kept in memory to avoid going to kinesis again for retry
+ private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
+ // if shard iterator not present for this message, get it
+ if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
+ refreshShardIteratorForFailedRecord(kinesisMessageId);
+ }
+ String shardIterator =
shardIteratorPerFailedMessage.get(kinesisMessageId);
+ LOG.debug("Fetching failed records for shard id :" +
kinesisMessageId.getShardId() + " at sequence number " +
kinesisMessageId.getSequenceNumber() +
+ " using shardIterator " + shardIterator);
+ try {
+ GetRecordsResult getRecordsResult =
kinesisConnection.fetchRecords(shardIterator);
+ if (getRecordsResult != null) {
+ List<Record> records = getRecordsResult.getRecords();
+ LOG.debug("Records size from fetchFailedRecords is " +
records.size());
+ // update the shard iterator to next one in case this
fetch does not give the message.
+ shardIteratorPerFailedMessage.put(kinesisMessageId,
getRecordsResult.getNextShardIterator());
+ if (records.size() == 0) {
+ LOG.warn("No records returned from kinesis. Hence
sleeping for 1 second");
+ Thread.sleep(1000);
--- End diff --
I would prefer this to be not configurable. Reason is kinesis api somewhere
mentioned in their documentation that if you dont receive any records it is
good to sleep for a second. Reason is that they some limits for GetRecords
request per second and it will throw an exception if that limit is reached or
violated. Although I am catching that exception I wanted to avoid that. Making
it configurable and go beyond 1 second can slow down the spout in some cases as
well. For example, sometimes it takes multiple requests to get to the portion
of the shard where the message at a sequence number you want to read is
present. In that case it does not make sense to backoff in an exponential
fashion. You just want to make sure you get there sooner without violating the
limit.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---