[ 
https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394128#comment-15394128
 ] 

ASF GitHub Bot commented on STORM-1839:
---------------------------------------

Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1586#discussion_r72296325
  
    --- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
    @@ -0,0 +1,566 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kinesis.spout;
    +
    +import com.amazonaws.regions.Region;
    +import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
    +import com.amazonaws.services.kinesis.model.DescribeStreamResult;
    +import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
    +import com.amazonaws.services.kinesis.model.GetRecordsRequest;
    +import com.amazonaws.services.kinesis.model.GetRecordsResult;
    +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
    +import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
    +import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
    +import com.amazonaws.services.kinesis.model.Record;
    +import com.amazonaws.services.kinesis.model.Shard;
    +import com.amazonaws.services.kinesis.model.ShardIteratorType;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryNTimes;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.zookeeper.CreateMode;
    +import org.json.simple.JSONValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.math.BigInteger;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +class KinesisRecordsManager {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
    +    // zk interaction object
    +    private transient CuratorFramework curatorFramework;
    +    // Kinesis Spout Config object
    +    private transient final Config config;
    +    // Queue of records per shard fetched from kinesis and are waiting to 
be emitted
    +    private transient Map<String, LinkedList<Record>> toEmitPerShard = new 
HashMap<>();
    +    // Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
    +    private transient Map<KinesisMessageId, Record> 
failedandFetchedRecords = new HashMap<>();
    +    // Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
    +    // sequence number to commit. Logic explained in commit
    +    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = 
new HashMap<>();
    +    // sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
    +    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new 
HashMap<>();
    +    // sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
    +    private transient Map<String, TreeSet<BigInteger>> failedPerShard = 
new HashMap<>();
    +    // shard iterator corresponding to position in shard for new messages
    +    private transient Map<String, String> shardIteratorPerShard = new 
HashMap<>();
    +    // last fetched sequence number corresponding to position in shard
    +    private transient Map<String, String> fetchedSequenceNumberPerShard = 
new HashMap<>();
    +    // shard iterator corresponding to position in shard for failed 
messages
    +    private transient Map<KinesisMessageId, String> 
shardIteratorPerFailedMessage = new HashMap<>();
    +    // timestamp to decide when to commit to zk again
    +    private transient long lastCommitTime;
    +    // boolean to track deactivated state
    +    private transient boolean deactivated;
    +    private transient AmazonKinesisClient kinesisClient;
    +
    +    KinesisRecordsManager (Config config) {
    +        this.config = config;
    +    }
    +
    +    void initialize (int myTaskIndex, int totalTasks) {
    +        deactivated = false;
    +        lastCommitTime = System.currentTimeMillis();
    +        initializeKinesisClient();
    +        initializeCurator();
    +        List<Shard> shards = this.getShards();
    +        LOG.info("myTaskIndex is " + myTaskIndex);
    +        LOG.info("totalTasks is " + totalTasks);
    +        int i = myTaskIndex;
    +        while (i < shards.size()) {
    +            LOG.info("Shard id " + shards.get(i).getShardId() + " assigned 
to task " + myTaskIndex);
    +            toEmitPerShard.put(shards.get(i).getShardId(), new 
LinkedList<Record>());
    +            i += totalTasks;
    +        }
    +        initializeFetchedSequenceNumbers();
    +        refreshShardIteratorsForNewRecords();
    +    }
    +
    +    void next (SpoutOutputCollector collector) {
    +        if (shouldCommit()) {
    +            commit();
    +        }
    +        KinesisMessageId failedMessageId = 
config.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
    +        if (failedMessageId  != null) {
    +            // if the retry service returns a message that is not in 
failed set then ignore it. should never happen
    +            BigInteger failedSequenceNumber = new 
BigInteger(failedMessageId.getSequenceNumber());
    +            if (failedPerShard.containsKey(failedMessageId.getShardId()) 
&& 
failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber))
 {
    +                if (!failedandFetchedRecords.containsKey(failedMessageId)) 
{
    +                    fetchFailedRecords(failedMessageId);
    +                }
    +                if (emitFailedRecord(collector, failedMessageId)) {
    +                    
failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
    +                    
config.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
    +                    return;
    +                } else {
    +                    LOG.debug("failedMessageEmitted not called on retrier 
for " + failedMessageId + ". This can happen a few times but should not happen 
" +
    +                            "infinitely");
    +                }
    +            } else {
    +                LOG.debug("failedPerShard does not contain " + 
failedMessageId + ". This should never happen.");
    +            }
    +        }
    +        LOG.debug("No failed record to emit for now. Hence will try to 
emit new records");
    +        // if maximum uncommitted records count has reached, so dont emit 
any new records and return
    +        if (!(getUncommittedRecordsCount() < 
config.getMaxUncommittedRecords())) {
    +            LOG.debug("maximum uncommitted records count has reached. so 
not emitting any new records and returning");
    --- End diff --
    
    this should be warn


> Kinesis Spout
> -------------
>
>                 Key: STORM-1839
>                 URL: https://issues.apache.org/jira/browse/STORM-1839
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: Sriharsha Chintalapani
>            Assignee: Priyank Shah
>
> As Storm is increasingly used in Cloud environments. It will great to have a 
> Kinesis Spout integration in Apache Storm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to