Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1586#discussion_r73043179
  
    --- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
    @@ -0,0 +1,449 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kinesis.spout;
    +
    +import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.GetRecordsResult;
    +import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +import com.amazonaws.services.kinesis.model.Record;
    +import com.amazonaws.services.kinesis.model.Shard;
    +import com.amazonaws.services.kinesis.model.ShardIteratorType;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.math.BigInteger;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +class KinesisRecordsManager {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
    +    // object handling zk interaction
    +    private transient ZKConnection zkConnection;
    +    // object handling interaction with kinesis
    +    private transient KinesisConnection kinesisConnection;
    +    // Kinesis Spout KinesisConfig object
    +    private transient final KinesisConfig kinesisConfig;
    +    // Queue of records per shard fetched from kinesis and are waiting to 
be emitted
    +    private transient Map<String, LinkedList<Record>> toEmitPerShard = new 
HashMap<>();
    +    // Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
    +    private transient Map<KinesisMessageId, Record> 
failedandFetchedRecords = new HashMap<>();
    +    // Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
    +    // sequence number to commit. Logic explained in commit
    +    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = 
new HashMap<>();
    +    // sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
    +    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new 
HashMap<>();
    +    // sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
    +    private transient Map<String, TreeSet<BigInteger>> failedPerShard = 
new HashMap<>();
    +    // shard iterator corresponding to position in shard for new messages
    +    private transient Map<String, String> shardIteratorPerShard = new 
HashMap<>();
    +    // last fetched sequence number corresponding to position in shard
    +    private transient Map<String, String> fetchedSequenceNumberPerShard = 
new HashMap<>();
    +    // shard iterator corresponding to position in shard for failed 
messages
    +    private transient Map<KinesisMessageId, String> 
shardIteratorPerFailedMessage = new HashMap<>();
    +    // timestamp to decide when to commit to zk again
    +    private transient long lastCommitTime;
    +    // boolean to track deactivated state
    +    private transient boolean deactivated;
    +
    +    KinesisRecordsManager (KinesisConfig kinesisConfig) {
    +        this.kinesisConfig = kinesisConfig;
    +        this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
    +        this.kinesisConnection = new 
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
    +    }
    +
    +    void initialize (int myTaskIndex, int totalTasks) {
    +        deactivated = false;
    +        lastCommitTime = System.currentTimeMillis();
    +        kinesisConnection.initialize();
    +        zkConnection.initialize();
    +        List<Shard> shards = 
kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
    +        LOG.info("myTaskIndex is " + myTaskIndex);
    +        LOG.info("totalTasks is " + totalTasks);
    +        int i = myTaskIndex;
    +        while (i < shards.size()) {
    +            LOG.info("Shard id " + shards.get(i).getShardId() + " assigned 
to task " + myTaskIndex);
    +            toEmitPerShard.put(shards.get(i).getShardId(), new 
LinkedList<Record>());
    +            i += totalTasks;
    +        }
    +        initializeFetchedSequenceNumbers();
    +        refreshShardIteratorsForNewRecords();
    +    }
    +
    +    void next (SpoutOutputCollector collector) {
    +        if (shouldCommit()) {
    +            commit();
    +        }
    +        KinesisMessageId failedMessageId = 
kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
    +        if (failedMessageId  != null) {
    +            // if the retry service returns a message that is not in 
failed set then ignore it. should never happen
    +            BigInteger failedSequenceNumber = new 
BigInteger(failedMessageId.getSequenceNumber());
    +            if (failedPerShard.containsKey(failedMessageId.getShardId()) 
&& 
failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber))
 {
    +                if (!failedandFetchedRecords.containsKey(failedMessageId)) 
{
    +                    fetchFailedRecords(failedMessageId);
    +                }
    +                if (emitFailedRecord(collector, failedMessageId)) {
    +                    
failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
    +                    
kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
    +                    return;
    +                } else {
    +                    LOG.warn("failedMessageEmitted not called on retrier 
for " + failedMessageId + ". This can happen a few times but should not happen 
" +
    +                            "infinitely");
    +                }
    +            } else {
    +                LOG.warn("failedPerShard does not contain " + 
failedMessageId + ". This should never happen.");
    +            }
    +        }
    +        LOG.debug("No failed record to emit for now. Hence will try to 
emit new records");
    +        // if maximum uncommitted records count has reached, so dont emit 
any new records and return
    +        if (!(getUncommittedRecordsCount() < 
kinesisConfig.getMaxUncommittedRecords())) {
    +            LOG.warn("maximum uncommitted records count has reached. so 
not emitting any new records and returning");
    +            return;
    +        }
    +        // early return as no shard is assigned - probably because number 
of executors > number of shards
    +        if (toEmitPerShard.isEmpty()) {
    +            LOG.warn("No shard is assigned to this task. Hence not 
emitting any tuple.");
    +            return;
    +        }
    +
    +        if (shouldFetchNewRecords()) {
    +            fetchNewRecords();
    +        }
    +        emitNewRecord(collector);
    +    }
    +
    +    void ack (KinesisMessageId kinesisMessageId) {
    +        // for an acked message add it to acked set and remove it from 
emitted and failed
    +        String shardId = kinesisMessageId.getShardId();
    +        BigInteger sequenceNumber = new 
BigInteger(kinesisMessageId.getSequenceNumber());
    +        LOG.debug("Ack received for shardId: " + shardId + " 
sequenceNumber: " + sequenceNumber);
    +        // if an ack is received for a message then add it to the 
ackedPerShard TreeSet. TreeSet because while committing we need to figure out 
what is the
    +        // highest sequence number that can be committed for this shard
    +        if (!ackedPerShard.containsKey(shardId)) {
    +            ackedPerShard.put(shardId, new TreeSet<BigInteger>());
    +        }
    +        ackedPerShard.get(shardId).add(sequenceNumber);
    +        // if the acked message was in emittedPerShard that means we need 
to remove it from the emittedPerShard(which keeps track of in flight tuples)
    +        if (emittedPerShard.containsKey(shardId)) {
    +            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
    +            emitted.remove(sequenceNumber);
    +        }
    +        // an acked message should not be in failed since if it fails and 
gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive 
coding.
    +        // Remove it from failedPerShard anyway
    +        if (failedPerShard.containsKey(shardId)) {
    +            failedPerShard.get(shardId).remove(sequenceNumber);
    +        }
    +        // if an ack is for a message that failed once at least and was 
re-emitted then the record itself will be in failedAndFetchedRecords. We use 
that to
    +        // determine if the FailedMessageRetryHandler needs to be told 
about it and then remove the record itself to clean up memory
    +        if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
    +            
kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
    +            failedandFetchedRecords.remove(kinesisMessageId);
    +        }
    +        // keep committing when topology is deactivated since ack and fail 
keep getting called on deactivated topology
    +        if (deactivated) {
    +            commit();
    +        }
    +    }
    +
    +    void fail (KinesisMessageId kinesisMessageId) {
    +        String shardId = kinesisMessageId.getShardId();
    +        BigInteger sequenceNumber = new 
BigInteger(kinesisMessageId.getSequenceNumber());
    +        LOG.debug("Fail received for shardId: " + shardId + " 
sequenceNumber: " + sequenceNumber);
    +        // for a failed message add it to failed set if it will be 
retried, otherwise ack it; remove from emitted either way
    +        if 
(kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
    +            if (!failedPerShard.containsKey(shardId)) {
    +                failedPerShard.put(shardId, new TreeSet<BigInteger>());
    +            }
    +            failedPerShard.get(shardId).add(sequenceNumber);
    +            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
    +            emitted.remove(sequenceNumber);
    +        } else {
    +            ack(kinesisMessageId);
    +        }
    +        // keep committing when topology is deactivated since ack and fail 
keep getting called on deactivated topology
    +        if (deactivated) {
    +            commit();
    +        }
    +    }
    +
    +    void commit () {
    +        // We have three mutually disjoint treesets per shard at any given 
time to keep track of what sequence number can be committed to zookeeper.
    +        // emittedPerShard, ackedPerShard and failedPerShard. Any record 
starts by entering emittedPerShard. On ack it moves from emittedPerShard to
    +        // ackedPerShard and on fail if retry service tells us to retry 
then it moves from emittedPerShard to failedPerShard. The failed records will 
move from
    +        // failedPerShard to emittedPerShard when the failed record is 
emitted again as a retry.
    +        // Logic for deciding what sequence number to commit is find the 
highest sequence number from ackedPerShard called X such that there is no 
sequence
    +        // number Y in emittedPerShard or failedPerShard that satisfies X 
> Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
    +        // failedPerShard is 3,7 then we can only commit 1 and not 4 
because 2 is still pending and 3 has failed
    +        for (String shardId: toEmitPerShard.keySet()) {
    +            if (ackedPerShard.containsKey(shardId)) {
    +                BigInteger commitSequenceNumberBound = null;
    +                if (failedPerShard.containsKey(shardId) && 
!failedPerShard.get(shardId).isEmpty()) {
    +                    commitSequenceNumberBound = 
failedPerShard.get(shardId).first();
    +                }
    +                if (emittedPerShard.containsKey(shardId) && 
!emittedPerShard.get(shardId).isEmpty()) {
    +                    BigInteger smallestEmittedSequenceNumber = 
emittedPerShard.get(shardId).first();
    +                    if (commitSequenceNumberBound == null || 
(commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
    +                        commitSequenceNumberBound = 
smallestEmittedSequenceNumber;
    +                    }
    +                }
    +                Iterator<BigInteger> ackedSequenceNumbers = 
ackedPerShard.get(shardId).iterator();
    +                BigInteger ackedSequenceNumberToCommit = null;
    +                while (ackedSequenceNumbers.hasNext()) {
    +                    BigInteger ackedSequenceNumber = 
ackedSequenceNumbers.next();
    +                    if (commitSequenceNumberBound == null || 
(commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
    +                        ackedSequenceNumberToCommit = ackedSequenceNumber;
    +                        ackedSequenceNumbers.remove();
    +                    } else {
    +                        break;
    +                    }
    +                }
    +                if (ackedSequenceNumberToCommit != null) {
    +                    Map<Object, Object> state = new HashMap<>();
    +                    state.put("committedSequenceNumber", 
ackedSequenceNumberToCommit.toString());
    +                    LOG.debug("Committing sequence number " + 
ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
    +                    
zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
    +                }
    +            }
    +        }
    +        lastCommitTime = System.currentTimeMillis();
    +    }
    +
    +    void activate () {
    +        LOG.info("Activate called");
    +        deactivated = false;
    +        kinesisConnection.initialize();
    +    }
    +
    +    void deactivate () {
    +        LOG.info("Deactivate called");
    +        deactivated = true;
    +        commit();
    +        kinesisConnection.shutdown();
    +    }
    +
    +    void close () {
    +        commit();
    +        kinesisConnection.shutdown();
    +        zkConnection.shutdown();
    +    }
    +
    +    // fetch records from kinesis starting at sequence number for message 
passed as argument. Any other messages fetched and are in the failed queue will 
also
    +    // be kept in memory to avoid going to kinesis again for retry
    +    private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
    +        // if shard iterator not present for this message, get it
    +        if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
    +            refreshShardIteratorForFailedRecord(kinesisMessageId);
    +        }
    +        String shardIterator = 
shardIteratorPerFailedMessage.get(kinesisMessageId);
    +        LOG.debug("Fetching failed records for shard id :" + 
kinesisMessageId.getShardId() + " at sequence number " + 
kinesisMessageId.getSequenceNumber() +
    +                " using shardIterator " + shardIterator);
    +        try {
    +            GetRecordsResult getRecordsResult = 
kinesisConnection.fetchRecords(shardIterator);
    +            if (getRecordsResult != null) {
    +                List<Record> records = getRecordsResult.getRecords();
    +                LOG.debug("Records size from fetchFailedRecords is " + 
records.size());
    +                // update the shard iterator to next one in case this 
fetch does not give the message.
    +                shardIteratorPerFailedMessage.put(kinesisMessageId, 
getRecordsResult.getNextShardIterator());
    +                if (records.size() == 0) {
    +                    LOG.warn("No records returned from kinesis. Hence 
sleeping for 1 second");
    +                    Thread.sleep(1000);
    --- End diff --
    
    Sorry missed these earlier. Any reason for it to sleep exactly one sec or 
should it be configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to