[
https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394128#comment-15394128
]
ASF GitHub Bot commented on STORM-1839:
---------------------------------------
Github user harshach commented on a diff in the pull request:
https://github.com/apache/storm/pull/1586#discussion_r72296325
--- Diff:
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
---
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisRecordsManager.class);
+ // zk interaction object
+ private transient CuratorFramework curatorFramework;
+ // Kinesis Spout Config object
+ private transient final Config config;
+ // Queue of records per shard fetched from kinesis and are waiting to
be emitted
+ private transient Map<String, LinkedList<Record>> toEmitPerShard = new
HashMap<>();
+ // Map of records that were fetched from kinesis as a result of
failure and are waiting to be emitted
+ private transient Map<KinesisMessageId, Record>
failedandFetchedRecords = new HashMap<>();
+ // Sequence numbers per shard that have been emitted. LinkedHashSet as
we need to remove on ack or fail. At the same time order is needed to figure
out the
+ // sequence number to commit. Logic explained in commit
+ private transient Map<String, TreeSet<BigInteger>> emittedPerShard =
new HashMap<>();
+ // sorted acked sequence numbers - needed to figure out what sequence
number can be committed
+ private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new
HashMap<>();
+ // sorted failed sequence numbers - needed to figure out what sequence
number can be committed
+ private transient Map<String, TreeSet<BigInteger>> failedPerShard =
new HashMap<>();
+ // shard iterator corresponding to position in shard for new messages
+ private transient Map<String, String> shardIteratorPerShard = new
HashMap<>();
+ // last fetched sequence number corresponding to position in shard
+ private transient Map<String, String> fetchedSequenceNumberPerShard =
new HashMap<>();
+ // shard iterator corresponding to position in shard for failed
messages
+ private transient Map<KinesisMessageId, String>
shardIteratorPerFailedMessage = new HashMap<>();
+ // timestamp to decide when to commit to zk again
+ private transient long lastCommitTime;
+ // boolean to track deactivated state
+ private transient boolean deactivated;
+ private transient AmazonKinesisClient kinesisClient;
+
+ KinesisRecordsManager (Config config) {
+ this.config = config;
+ }
+
+ void initialize (int myTaskIndex, int totalTasks) {
+ deactivated = false;
+ lastCommitTime = System.currentTimeMillis();
+ initializeKinesisClient();
+ initializeCurator();
+ List<Shard> shards = this.getShards();
+ LOG.info("myTaskIndex is " + myTaskIndex);
+ LOG.info("totalTasks is " + totalTasks);
+ int i = myTaskIndex;
+ while (i < shards.size()) {
+ LOG.info("Shard id " + shards.get(i).getShardId() + " assigned
to task " + myTaskIndex);
+ toEmitPerShard.put(shards.get(i).getShardId(), new
LinkedList<Record>());
+ i += totalTasks;
+ }
+ initializeFetchedSequenceNumbers();
+ refreshShardIteratorsForNewRecords();
+ }
+
+ void next (SpoutOutputCollector collector) {
+ if (shouldCommit()) {
+ commit();
+ }
+ KinesisMessageId failedMessageId =
config.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
+ if (failedMessageId != null) {
+ // if the retry service returns a message that is not in
failed set then ignore it. should never happen
+ BigInteger failedSequenceNumber = new
BigInteger(failedMessageId.getSequenceNumber());
+ if (failedPerShard.containsKey(failedMessageId.getShardId())
&&
failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber))
{
+ if (!failedandFetchedRecords.containsKey(failedMessageId))
{
+ fetchFailedRecords(failedMessageId);
+ }
+ if (emitFailedRecord(collector, failedMessageId)) {
+
failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
+
config.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
+ return;
+ } else {
+ LOG.debug("failedMessageEmitted not called on retrier
for " + failedMessageId + ". This can happen a few times but should not happen
" +
+ "infinitely");
+ }
+ } else {
+ LOG.debug("failedPerShard does not contain " +
failedMessageId + ". This should never happen.");
+ }
+ }
+ LOG.debug("No failed record to emit for now. Hence will try to
emit new records");
+ // if maximum uncommitted records count has reached, so dont emit
any new records and return
+ if (!(getUncommittedRecordsCount() <
config.getMaxUncommittedRecords())) {
+ LOG.debug("maximum uncommitted records count has reached. so
not emitting any new records and returning");
--- End diff --
this should be warn
> Kinesis Spout
> -------------
>
> Key: STORM-1839
> URL: https://issues.apache.org/jira/browse/STORM-1839
> Project: Apache Storm
> Issue Type: Improvement
> Reporter: Sriharsha Chintalapani
> Assignee: Priyank Shah
>
> As Storm is increasingly used in Cloud environments. It will great to have a
> Kinesis Spout integration in Apache Storm.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)