This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-3988 in repository https://gitbox.apache.org/repos/asf/storm.git
commit 78b799cb28fd1e1ac23eec0c45264aabdb93eb21 Author: Richard Zowalla <[email protected]> AuthorDate: Thu Oct 19 08:58:09 2023 +0200 STORM-3988 - Remove "storm-kinesis" --- external/storm-kinesis/README.md | 140 ------ external/storm-kinesis/pom.xml | 73 ---- .../kinesis/spout/CredentialsProviderChain.java | 40 -- .../kinesis/spout/ExponentialBackoffRetrier.java | 169 ------- .../kinesis/spout/FailedMessageRetryHandler.java | 48 -- .../apache/storm/kinesis/spout/KinesisConfig.java | 144 ------ .../storm/kinesis/spout/KinesisConnection.java | 121 ----- .../storm/kinesis/spout/KinesisConnectionInfo.java | 117 ----- .../storm/kinesis/spout/KinesisMessageId.java | 81 ---- .../storm/kinesis/spout/KinesisRecordsManager.java | 486 --------------------- .../apache/storm/kinesis/spout/KinesisSpout.java | 86 ---- .../storm/kinesis/spout/RecordToTupleMapper.java | 40 -- .../apache/storm/kinesis/spout/ZkConnection.java | 97 ---- .../org/apache/storm/kinesis/spout/ZkInfo.java | 124 ------ .../storm/kinesis/spout/test/KinesisBoltTest.java | 48 -- .../kinesis/spout/test/KinesisSpoutTopology.java | 57 --- .../spout/test/TestRecordToTupleMapper.java | 58 --- pom.xml | 1 - 18 files changed, 1930 deletions(-) diff --git a/external/storm-kinesis/README.md b/external/storm-kinesis/README.md deleted file mode 100644 index 8eaf53278..000000000 --- a/external/storm-kinesis/README.md +++ /dev/null @@ -1,140 +0,0 @@ -#Storm Kinesis Spout -Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and -starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each -object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task -can read from more than one shard. - -```java -public class KinesisSpoutTopology { - public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { - String topologyName = args[0]; - RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper(); - KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2, - 1000); - ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000); - KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON, - recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L); - KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig); - TopologyBuilder topologyBuilder = new TopologyBuilder(); - topologyBuilder.setSpout("spout", kinesisSpout, 3); - topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout"); - Config topologyConfig = new Config(); - topologyConfig.setDebug(true); - topologyConfig.setNumWorkers(3); - StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology()); - } -} -``` -As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below. - -#### `String` streamName -name of kinesis stream to consume data from - -#### `ShardIteratorType` shardIteratorType -3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards -is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you -will need to clear the state of zookeeper node used for storing sequence numbers - -#### `RecordToTupleMapper` recordToTupleMapper -an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields -tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked -```java - Fields getOutputFields (); - List<Object> getTuple (Record record); -``` - -#### `Date` timestamp -used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The -time used by kinesis is the server side time associated to the record by kinesis - -#### `FailedMessageRetryHadnler` failedMessageRetryHandler -an implementation of the `FailedMessageRetryHandler` interface. By default this module provides an implementation that supports a exponential backoff retry -mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and -subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. -Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of -retries as third argument. The methods of this interface and its working in accord with the spout is explained below -```java - boolean failed (KinesisMessageId messageId); - KinesisMessageId getNextFailedMessageToRetry (); - void failedMessageEmitted (KinesisMessageId messageId); - void acked (KinesisMessageId messageId); -``` -failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise. - -getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried -if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually -return every message for which it returned true when failed method was called for that message - -failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same -message when getNextFailedMessageToRetry is called again - -acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again - -#### `ZkInfo` zkInfo -an object encapsulating information for zookeeper interaction. The constructor takes zkUrl as first argument which is a comma separated string of zk host and -port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout as third in milliseconds, connection timeout -as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, retry attempts as sixth for zk client -connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. - -#### `KinesisConnectionInfo` kinesisConnectionInfo -an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of `AWSCredentialsProvider` -as first argument. This module provides an implementation called `CredentialsProviderChain` that allows the spout to authenticate with kinesis using one of -the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, -`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes an object of `ClientConfiguration` as second argument for configuring the kinesis -client, `Regions` as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number -of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis -throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect -the choice of limit argument - -#### `Long` maxUncommittedRecords -this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from -kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from -topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending -and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk. -However, storm can still call next tuple on the spout because there is only 1 pending message - -### Maven dependencies -Aws sdk version that this was tested with is 1.10.77 - -```xml - <dependencies> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk</artifactId> - <version>${aws-java-sdk.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <version>${curator.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.googlecode.json-simple</groupId> - <artifactId>json-simple</artifactId> - </dependency> - </dependencies> -``` - -#Future Work -Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics - -## Committer Sponsors - - * Sriharsha Chintalapani ([[email protected]](mailto:[email protected])) diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml deleted file mode 100644 index 9c2805c75..000000000 --- a/external/storm-kinesis/pom.xml +++ /dev/null @@ -1,73 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - <modelVersion>4.0.0</modelVersion> - <packaging>jar</packaging> - <artifactId>storm-kinesis</artifactId> - <name>storm-kinesis</name> - - <properties> - <aws-java-sdk.version>1.10.77</aws-java-sdk.version> - </properties> - - <dependencies> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk</artifactId> - <version>${aws-java-sdk.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </dependency> - <dependency> - <groupId>net.minidev</groupId> - <artifactId>json-smart</artifactId> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> - diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java deleted file mode 100644 index 5820851cf..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import com.amazonaws.auth.AWSCredentialsProviderChain; -import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider; -import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; -import com.amazonaws.auth.InstanceProfileCredentialsProvider; -import com.amazonaws.auth.SystemPropertiesCredentialsProvider; -import com.amazonaws.auth.profile.ProfileCredentialsProvider; - -/** - * Class representing chain of mechanisms that will be used in order to connect to kinesis. - */ -public class CredentialsProviderChain extends AWSCredentialsProviderChain { - - public CredentialsProviderChain() { - super(new EnvironmentVariableCredentialsProvider(), - new SystemPropertiesCredentialsProvider(), - new ClasspathPropertiesFileCredentialsProvider(), - new InstanceProfileCredentialsProvider(), - new ProfileCredentialsProvider()); - } -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java deleted file mode 100644 index 2920913d2..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import java.io.Serializable; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable { - private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class); - // Wait interfal for retrying after first failure - private final Long initialDelayMillis; - // Base for exponential function in seconds for retrying for second, third and so on failures - private final Long baseSeconds; - // Maximum number of retries - private final Long maxRetries; - // map to track number of failures for each kinesis message that failed - private Map<KinesisMessageId, Long> failCounts = new HashMap<>(); - // map to track next retry time for each kinesis message that failed - private Map<KinesisMessageId, Long> retryTimes = new HashMap<>(); - // sorted set of records to be retrued based on retry time. earliest retryTime record comes first - private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator()); - - /** - * No args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an - * exponential backoff of {@code Math.pow(2,i-1)} secs for retry {@code i} where {@code i = 2,3,...}. - */ - public ExponentialBackoffRetrier() { - this(100L, 2L, Long.MAX_VALUE); - } - - /** - * Creates a new exponential backoff retrier. - * @param initialDelayMillis delay in milliseconds for first retry - * @param baseSeconds base for exponent function in seconds - * @param maxRetries maximum number of retries before the record is discarded/acked - */ - public ExponentialBackoffRetrier(Long initialDelayMillis, Long baseSeconds, Long maxRetries) { - this.initialDelayMillis = initialDelayMillis; - this.baseSeconds = baseSeconds; - this.maxRetries = maxRetries; - validate(); - } - - private void validate() { - if (initialDelayMillis < 0) { - throw new IllegalArgumentException("initialDelayMillis cannot be negative."); - } - if (baseSeconds < 0) { - throw new IllegalArgumentException("baseSeconds cannot be negative."); - } - if (maxRetries < 0) { - throw new IllegalArgumentException("maxRetries cannot be negative."); - } - } - - @Override - public boolean failed(KinesisMessageId messageId) { - LOG.debug("Handling failed message {}", messageId); - // if maxRetries is 0, dont retry and return false as per interface contract - if (maxRetries == 0) { - LOG.warn("maxRetries set to 0. Hence not queueing " + messageId); - return false; - } - // if first failure add it to the count map - if (!failCounts.containsKey(messageId)) { - failCounts.put(messageId, 0L); - } - // increment the fail count as we started with 0 - Long failCount = failCounts.get(messageId); - failCounts.put(messageId, ++failCount); - // if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum - if (failCount > maxRetries) { - LOG.warn("maxRetries reached so dropping " + messageId); - failCounts.remove(messageId); - return false; - } - // if reached so far, add it to the set of messages waiting to be retried with next retry time based on how many times it failed - retryTimes.put(messageId, getRetryTime(failCount)); - retryMessageSet.add(messageId); - LOG.debug("Scheduled {} for retry at {} and retry attempt {}", messageId, retryTimes.get(messageId), failCount); - return true; - } - - @Override - public void acked(KinesisMessageId messageId) { - // message was acked after being retried. so clear the state for that message - LOG.debug("Ack received for {}. Hence cleaning state.", messageId); - failCounts.remove(messageId); - } - - @Override - public KinesisMessageId getNextFailedMessageToRetry() { - KinesisMessageId result = null; - // return the first message to be retried from the set. It will return the message with the earliest retry time <= current time - if (!retryMessageSet.isEmpty()) { - result = retryMessageSet.first(); - if (!(retryTimes.get(result) <= System.nanoTime())) { - result = null; - } - } - LOG.debug("Returning {} to spout for retrying.", result); - return result; - } - - @Override - public void failedMessageEmitted(KinesisMessageId messageId) { - // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and - // wait for its ack or fail - // but still keep it in counts map to retry again on failure or remove on ack - LOG.debug("Spout says {} emitted. Hence removing it from queue and wait for its ack or fail", messageId); - retryMessageSet.remove(messageId); - retryTimes.remove(messageId); - } - - /** - * private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to - * Long.MAX_VALUE). - */ - private Long getRetryTime(Long retryNum) { - Long retryTime = System.nanoTime(); - Long nanoMultiplierForMillis = 1000000L; - // if first retry then retry time = current time + initial delay - if (retryNum == 1) { - retryTime += initialDelayMillis * nanoMultiplierForMillis; - } else { - // else use the exponential backoff logic and handle long overflow - Long maxValue = Long.MAX_VALUE; - double time = Math.pow(baseSeconds, retryNum - 1) * 1000 * nanoMultiplierForMillis; - // if delay or delay + current time are bigger than long max value - // second predicate for or condition uses the fact that long addition over the limit circles back - if ((time >= maxValue.doubleValue()) || ((retryTime + (long) time) < retryTime)) { - retryTime = maxValue; - } else { - retryTime += (long) time; - } - } - return retryTime; - } - - private class RetryTimeComparator implements Serializable, Comparator<KinesisMessageId> { - @Override - public int compare(KinesisMessageId o1, KinesisMessageId o2) { - return retryTimes.get(o1).compareTo(retryTimes.get(o2)); - } - } -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java deleted file mode 100644 index c005c5986..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import java.io.Serializable; - -public interface FailedMessageRetryHandler extends Serializable { - /** - * message with messageId failed in the spout. - * @param messageId the message id - * @return true if this failed message was scheduled to be retried, false otherwise - */ - boolean failed(KinesisMessageId messageId); - - /** - * message with messageId succeeded/acked in the spout. - * @param messageId the message id - */ - void acked(KinesisMessageId messageId); - - /** - * Get the next failed message's id to retry if any, null otherwise. - * @return messageId - */ - KinesisMessageId getNextFailedMessageToRetry(); - - /** - * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout. - * @param messageId the message id - */ - void failedMessageEmitted(KinesisMessageId messageId); -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java deleted file mode 100644 index eeb58968c..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import com.amazonaws.services.kinesis.model.ShardIteratorType; - -import java.io.Serializable; -import java.util.Date; - -public class KinesisConfig implements Serializable { - // kinesis stream name to read from - private final String streamName; - // shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported - private final ShardIteratorType shardIteratorType; - // implementation for converting a Kinesis record to a storm tuple - private final RecordToTupleMapper recordToTupleMapper; - // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null - private final Date timestamp; - // implementation for handling the failed messages retry logic - private final FailedMessageRetryHandler failedMessageRetryHandler; - // object capturing all zk related information for storing committed sequence numbers - private final ZkInfo zkInfo; - // object representing information on paramaters to use while connecting to kinesis using kinesis client - private final KinesisConnectionInfo kinesisConnectionInfo; - /** - * This number represents the number of messages that are still not committed to zk. it will prevent the spout from - * emitting further. for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still can't be - * committed to zk because 1 is still in failed set. As a result the acked queue can infinitely grow without any of - * them being committed to zk. topology max pending does not help since from storm's view they are acked - */ - private final Long maxUncommittedRecords; - - public KinesisConfig(String streamName, - ShardIteratorType shardIteratorType, - RecordToTupleMapper recordToTupleMapper, - Date timestamp, - FailedMessageRetryHandler failedMessageRetryHandler, - ZkInfo zkInfo, - KinesisConnectionInfo kinesisConnectionInfo, - Long maxUncommittedRecords) { - this.streamName = streamName; - this.shardIteratorType = shardIteratorType; - this.recordToTupleMapper = recordToTupleMapper; - this.timestamp = timestamp; - this.failedMessageRetryHandler = failedMessageRetryHandler; - this.zkInfo = zkInfo; - this.kinesisConnectionInfo = kinesisConnectionInfo; - this.maxUncommittedRecords = maxUncommittedRecords; - validate(); - } - - private void validate() { - if (streamName == null || streamName.length() < 1) { - throw new IllegalArgumentException("streamName is required and cannot be of length 0."); - } - if (shardIteratorType == null - || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) { - throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP - + "," + ShardIteratorType.LATEST - + "," + ShardIteratorType.TRIM_HORIZON); - } - if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) { - throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP); - } - if (recordToTupleMapper == null) { - throw new IllegalArgumentException("recordToTupleMapper cannot be null"); - } - if (failedMessageRetryHandler == null) { - throw new IllegalArgumentException("failedMessageRetryHandler cannot be null"); - } - if (zkInfo == null) { - throw new IllegalArgumentException("zkInfo cannot be null"); - } - if (kinesisConnectionInfo == null) { - throw new IllegalArgumentException("kinesisConnectionInfo cannot be null"); - } - if (maxUncommittedRecords == null || maxUncommittedRecords < 1) { - throw new IllegalArgumentException("maxUncommittedRecords has to be a positive integer"); - } - } - - public String getStreamName() { - return streamName; - } - - public ShardIteratorType getShardIteratorType() { - return shardIteratorType; - } - - public RecordToTupleMapper getRecordToTupleMapper() { - return recordToTupleMapper; - } - - public Date getTimestamp() { - return timestamp; - } - - public FailedMessageRetryHandler getFailedMessageRetryHandler() { - return failedMessageRetryHandler; - } - - public ZkInfo getZkInfo() { - return zkInfo; - } - - public KinesisConnectionInfo getKinesisConnectionInfo() { - return kinesisConnectionInfo; - } - - public Long getMaxUncommittedRecords() { - return maxUncommittedRecords; - } - - @Override - public String toString() { - return "KinesisConfig{" - + "streamName='" + streamName + '\'' - + ", shardIteratorType=" + shardIteratorType - + ", recordToTupleMapper=" + recordToTupleMapper - + ", timestamp=" + timestamp - + ", zkInfo=" + zkInfo - + ", kinesisConnectionInfo=" + kinesisConnectionInfo - + ", failedMessageRetryHandler =" + failedMessageRetryHandler - + ", maxUncommittedRecords=" + maxUncommittedRecords - + '}'; - } -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java deleted file mode 100644 index b8b969aee..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import com.amazonaws.regions.Region; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class KinesisConnection { - private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); - private final KinesisConnectionInfo kinesisConnectionInfo; - private AmazonKinesisClient kinesisClient; - - KinesisConnection(KinesisConnectionInfo kinesisConnectionInfo) { - this.kinesisConnectionInfo = kinesisConnectionInfo; - } - - void initialize() { - kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), - kinesisConnectionInfo.getClientConfiguration()); - kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion())); - } - - List<Shard> getShardsForStream(String stream) { - DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(stream); - List<Shard> shards = new ArrayList<>(); - String exclusiveStartShardId = null; - do { - describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); - DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest); - shards.addAll(describeStreamResult.getStreamDescription().getShards()); - if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { - exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); - } else { - exclusiveStartShardId = null; - } - } while (exclusiveStartShardId != null); - LOG.info("Number of shards for stream " + stream + " are " + shards.size()); - return shards; - } - - String getShardIterator(String stream, - String shardId, - ShardIteratorType shardIteratorType, - String sequenceNumber, - Date timestamp) { - String shardIterator = ""; - try { - GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); - getShardIteratorRequest.setStreamName(stream); - getShardIteratorRequest.setShardId(shardId); - getShardIteratorRequest.setShardIteratorType(shardIteratorType); - if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) { - getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber); - } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) { - getShardIteratorRequest.setTimestamp(timestamp); - } - GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); - if (getShardIteratorResult != null) { - shardIterator = getShardIteratorResult.getShardIterator(); - } - } catch (Exception e) { - LOG.warn("Exception occured while getting shardIterator for shard " + shardId - + " shardIteratorType " + shardIteratorType - + " sequence number " + sequenceNumber - + " timestamp " + timestamp, - e); - } - LOG.warn("Returning shardIterator " + shardIterator - + " for shardId " + shardId - + " shardIteratorType " + shardIteratorType - + " sequenceNumber " + sequenceNumber - + " timestamp" + timestamp); - return shardIterator; - } - - GetRecordsResult fetchRecords(String shardIterator) { - GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); - getRecordsRequest.setShardIterator(shardIterator); - getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit()); - GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); - return getRecordsResult; - } - - void shutdown() { - kinesisClient.shutdown(); - } - -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java deleted file mode 100644 index 886720b1c..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.regions.Regions; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.Serializable; -import java.util.Arrays; - -import org.objenesis.strategy.StdInstantiatorStrategy; - -public class KinesisConnectionInfo implements Serializable { - private final byte[] serializedKinesisCredsProvider; - private final byte[] serializedkinesisClientConfig; - private final Integer recordsLimit; - private final Regions region; - - private transient AWSCredentialsProvider credentialsProvider; - private transient ClientConfiguration clientConfiguration; - - /** - * Create a new Kinesis connection info. - * @param credentialsProvider implementation to provide credentials to connect to kinesis - * @param clientConfiguration client configuration to pass to kinesis client - * @param region region to connect to - * @param recordsLimit max records to be fetched in a getRecords request to kinesis - */ - public KinesisConnectionInfo(AWSCredentialsProvider credentialsProvider, - ClientConfiguration clientConfiguration, - Regions region, - Integer recordsLimit) { - if (recordsLimit == null || recordsLimit <= 0) { - throw new IllegalArgumentException("recordsLimit has to be a positive integer"); - } - if (region == null) { - throw new IllegalArgumentException("region cannot be null"); - } - serializedKinesisCredsProvider = getKryoSerializedBytes(credentialsProvider); - serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration); - this.recordsLimit = recordsLimit; - this.region = region; - } - - public Integer getRecordsLimit() { - return recordsLimit; - } - - public AWSCredentialsProvider getCredentialsProvider() { - if (credentialsProvider == null) { - credentialsProvider = (AWSCredentialsProvider) this.getKryoDeserializedObject(serializedKinesisCredsProvider); - } - return credentialsProvider; - } - - public ClientConfiguration getClientConfiguration() { - if (clientConfiguration == null) { - clientConfiguration = (ClientConfiguration) this.getKryoDeserializedObject(serializedkinesisClientConfig); - } - return clientConfiguration; - } - - public Regions getRegion() { - return region; - } - - private byte[] getKryoSerializedBytes(final Object obj) { - final Kryo kryo = new Kryo(); - kryo.setRegistrationRequired(false); - final ByteArrayOutputStream os = new ByteArrayOutputStream(); - final Output output = new Output(os); - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.writeClassAndObject(output, obj); - output.flush(); - return os.toByteArray(); - } - - private Object getKryoDeserializedObject(final byte[] ser) { - final Kryo kryo = new Kryo(); - kryo.setRegistrationRequired(false); - final Input input = new Input(new ByteArrayInputStream(ser)); - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); - return kryo.readClassAndObject(input); - } - - @Override - public String toString() { - return "KinesisConnectionInfo{" - + "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) - + ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) - + ", region=" + region - + ", recordsLimit=" + recordsLimit - + '}'; - } -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java deleted file mode 100644 index 7fd179fbf..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -public class KinesisMessageId { - private final String streamName; - private final String shardId; - private final String sequenceNumber; - - KinesisMessageId(String streamName, String shardId, String sequenceNumber) { - this.streamName = streamName; - this.shardId = shardId; - this.sequenceNumber = sequenceNumber; - } - - public String getStreamName() { - return streamName; - } - - public String getShardId() { - return shardId; - } - - public String getSequenceNumber() { - return sequenceNumber; - } - - @Override - public String toString() { - return "KinesisMessageId{" - + "streamName='" + streamName + '\'' - + ", shardId='" + shardId + '\'' - + ", sequenceNumber='" + sequenceNumber + '\'' - + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - KinesisMessageId that = (KinesisMessageId) o; - - if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) { - return false; - } - if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) { - return false; - } - return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null); - - } - - @Override - public int hashCode() { - int result = streamName != null ? streamName.hashCode() : 0; - result = 31 * result + (shardId != null ? shardId.hashCode() : 0); - result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0); - return result; - } -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java deleted file mode 100644 index 33dba4aa5..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java +++ /dev/null @@ -1,486 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; - -import java.math.BigInteger; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; - -import org.apache.storm.spout.SpoutOutputCollector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class KinesisRecordsManager { - private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); - // object handling zk interaction - private transient ZkConnection zkConnection; - // object handling interaction with kinesis - private transient KinesisConnection kinesisConnection; - // Kinesis Spout KinesisConfig object - private final transient KinesisConfig kinesisConfig; - // Queue of records per shard fetched from kinesis and are waiting to be emitted - private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>(); - // Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted - private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>(); - /** - * Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. - * At the same time order is needed to figure out the sequence number to commit. Logic explained in commit - */ - private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>(); - // sorted acked sequence numbers - needed to figure out what sequence number can be committed - private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>(); - // sorted failed sequence numbers - needed to figure out what sequence number can be committed - private transient Map<String, TreeSet<BigInteger>> failedPerShard = new HashMap<>(); - // shard iterator corresponding to position in shard for new messages - private transient Map<String, String> shardIteratorPerShard = new HashMap<>(); - // last fetched sequence number corresponding to position in shard - private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>(); - // shard iterator corresponding to position in shard for failed messages - private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>(); - // timestamp to decide when to commit to zk again - private transient long lastCommitTime; - // boolean to track deactivated state - private transient boolean deactivated; - - KinesisRecordsManager(KinesisConfig kinesisConfig) { - this.kinesisConfig = kinesisConfig; - this.zkConnection = new ZkConnection(kinesisConfig.getZkInfo()); - this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo()); - } - - void initialize(int myTaskIndex, int totalTasks) { - deactivated = false; - lastCommitTime = System.currentTimeMillis(); - kinesisConnection.initialize(); - zkConnection.initialize(); - List<Shard> shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName()); - LOG.info("myTaskIndex is " + myTaskIndex); - LOG.info("totalTasks is " + totalTasks); - int i = myTaskIndex; - while (i < shards.size()) { - LOG.info("Shard id " + shards.get(i).getShardId() + " assigned to task " + myTaskIndex); - toEmitPerShard.put(shards.get(i).getShardId(), new LinkedList<Record>()); - i += totalTasks; - } - initializeFetchedSequenceNumbers(); - refreshShardIteratorsForNewRecords(); - } - - void next(SpoutOutputCollector collector) { - if (shouldCommit()) { - commit(); - } - KinesisMessageId failedMessageId = kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry(); - if (failedMessageId != null) { - // if the retry service returns a message that is not in failed set then ignore it. should never happen - BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber()); - if (failedPerShard.containsKey(failedMessageId.getShardId()) - && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) { - if (!failedandFetchedRecords.containsKey(failedMessageId)) { - fetchFailedRecords(failedMessageId); - } - if (emitFailedRecord(collector, failedMessageId)) { - failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber); - kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId); - return; - } else { - LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId - + ". This can happen a few times but should not happen infinitely"); - } - } else { - LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen."); - } - } - LOG.debug("No failed record to emit for now. Hence will try to emit new records"); - // if maximum uncommitted records count has reached, so dont emit any new records and return - if (!(getUncommittedRecordsCount() < kinesisConfig.getMaxUncommittedRecords())) { - LOG.warn("maximum uncommitted records count has reached. so not emitting any new records and returning"); - return; - } - // early return as no shard is assigned - probably because number of executors > number of shards - if (toEmitPerShard.isEmpty()) { - LOG.warn("No shard is assigned to this task. Hence not emitting any tuple."); - return; - } - - if (shouldFetchNewRecords()) { - fetchNewRecords(); - } - emitNewRecord(collector); - } - - void ack(KinesisMessageId kinesisMessageId) { - // for an acked message add it to acked set and remove it from emitted and failed - String shardId = kinesisMessageId.getShardId(); - BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber()); - LOG.debug("Ack received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber); - // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while - // committing we need to figure out what is the - // highest sequence number that can be committed for this shard - if (!ackedPerShard.containsKey(shardId)) { - ackedPerShard.put(shardId, new TreeSet<BigInteger>()); - } - ackedPerShard.get(shardId).add(sequenceNumber); - // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard (which - // keeps track of in flight tuples) - if (emittedPerShard.containsKey(shardId)) { - TreeSet<BigInteger> emitted = emittedPerShard.get(shardId); - emitted.remove(sequenceNumber); - } - // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard - // from failedPerShard. Defensive coding. - // Remove it from failedPerShard anyway - if (failedPerShard.containsKey(shardId)) { - failedPerShard.get(shardId).remove(sequenceNumber); - } - // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in - // failedAndFetchedRecords. We use that to - // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to - // clean up memory - if (failedandFetchedRecords.containsKey(kinesisMessageId)) { - kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId); - failedandFetchedRecords.remove(kinesisMessageId); - } - // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology - if (deactivated) { - commit(); - } - } - - void fail(KinesisMessageId kinesisMessageId) { - String shardId = kinesisMessageId.getShardId(); - BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber()); - LOG.debug("Fail received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber); - // for a failed message add it to failed set if it will be retried, otherwise ack it; remove from emitted either way - if (kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) { - if (!failedPerShard.containsKey(shardId)) { - failedPerShard.put(shardId, new TreeSet<BigInteger>()); - } - failedPerShard.get(shardId).add(sequenceNumber); - TreeSet<BigInteger> emitted = emittedPerShard.get(shardId); - emitted.remove(sequenceNumber); - } else { - ack(kinesisMessageId); - } - // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology - if (deactivated) { - commit(); - } - } - - void commit() { - // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number - // can be committed to zookeeper. - // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack - // it moves from emittedPerShard to - // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to - // failedPerShard. The failed records will move from - // failedPerShard to emittedPerShard when the failed record is emitted again as a retry. - // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard - // called X such that there is no sequence - // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, - // emittedPerShard is 2,6 and - // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed - for (String shardId: toEmitPerShard.keySet()) { - if (ackedPerShard.containsKey(shardId)) { - BigInteger commitSequenceNumberBound = null; - if (failedPerShard.containsKey(shardId) && !failedPerShard.get(shardId).isEmpty()) { - commitSequenceNumberBound = failedPerShard.get(shardId).first(); - } - if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) { - BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first(); - if (commitSequenceNumberBound == null - || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) { - commitSequenceNumberBound = smallestEmittedSequenceNumber; - } - } - Iterator<BigInteger> ackedSequenceNumbers = ackedPerShard.get(shardId).iterator(); - BigInteger ackedSequenceNumberToCommit = null; - while (ackedSequenceNumbers.hasNext()) { - BigInteger ackedSequenceNumber = ackedSequenceNumbers.next(); - if (commitSequenceNumberBound == null - || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) { - ackedSequenceNumberToCommit = ackedSequenceNumber; - ackedSequenceNumbers.remove(); - } else { - break; - } - } - if (ackedSequenceNumberToCommit != null) { - Map<Object, Object> state = new HashMap<>(); - state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString()); - LOG.debug("Committing sequence number {} for shardId {}", - ackedSequenceNumberToCommit.toString(), - shardId); - zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state); - } - } - } - lastCommitTime = System.currentTimeMillis(); - } - - void activate() { - LOG.info("Activate called"); - deactivated = false; - kinesisConnection.initialize(); - } - - void deactivate() { - LOG.info("Deactivate called"); - deactivated = true; - commit(); - kinesisConnection.shutdown(); - } - - void close() { - commit(); - kinesisConnection.shutdown(); - zkConnection.shutdown(); - } - - // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched - // and are in the failed queue will also - // be kept in memory to avoid going to kinesis again for retry - private void fetchFailedRecords(KinesisMessageId kinesisMessageId) { - // if shard iterator not present for this message, get it - if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) { - refreshShardIteratorForFailedRecord(kinesisMessageId); - } - String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId); - LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}", - kinesisMessageId.getShardId(), - kinesisMessageId.getSequenceNumber(), - shardIterator); - try { - GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator); - if (getRecordsResult != null) { - List<Record> records = getRecordsResult.getRecords(); - LOG.debug("Records size from fetchFailedRecords is {}", records.size()); - // update the shard iterator to next one in case this fetch does not give the message. - shardIteratorPerFailedMessage.put(kinesisMessageId, getRecordsResult.getNextShardIterator()); - if (records.size() == 0) { - LOG.warn("No records returned from kinesis. Hence sleeping for 1 second"); - Thread.sleep(1000); - } else { - // add all fetched records to the set of failed records if they are present in failed set - for (Record record: records) { - KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), - kinesisMessageId.getShardId(), - record.getSequenceNumber()); - if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) { - failedandFetchedRecords.put(current, record); - shardIteratorPerFailedMessage.remove(current); - } - } - } - } - } catch (InterruptedException ie) { - LOG.warn("Thread interrupted while sleeping", ie); - } catch (ExpiredIteratorException ex) { - LOG.warn("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator"); - refreshShardIteratorForFailedRecord(kinesisMessageId); - } catch (ProvisionedThroughputExceededException pe) { - try { - LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe); - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.warn("Thread interrupted exception", e); - } - } - } - - private void fetchNewRecords() { - for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) { - String shardId = entry.getKey(); - try { - String shardIterator = shardIteratorPerShard.get(shardId); - LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}", - shardId, - shardIterator, - fetchedSequenceNumberPerShard.get(shardId)); - GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator); - if (getRecordsResult != null) { - List<Record> records = getRecordsResult.getRecords(); - LOG.debug("Records size from fetchNewRecords is {}", records.size()); - // update the shard iterator to next one in case this fetch does not give the message. - shardIteratorPerShard.put(shardId, getRecordsResult.getNextShardIterator()); - if (records.size() == 0) { - LOG.warn("No records returned from kinesis. Hence sleeping for 1 second"); - Thread.sleep(1000); - } else { - entry.getValue().addAll(records); - fetchedSequenceNumberPerShard.put(shardId, records.get(records.size() - 1).getSequenceNumber()); - } - } - } catch (InterruptedException ie) { - LOG.warn("Thread interrupted while sleeping", ie); - } catch (ExpiredIteratorException ex) { - LOG.warn("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator"); - refreshShardIteratorForNewRecords(shardId); - } catch (ProvisionedThroughputExceededException pe) { - try { - LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe); - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.warn("Thread interrupted exception", e); - } - } - } - } - - private void emitNewRecord(SpoutOutputCollector collector) { - for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) { - String shardId = entry.getKey(); - LinkedList<Record> listOfRecords = entry.getValue(); - Record record; - while ((record = listOfRecords.pollFirst()) != null) { - KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), - shardId, - record.getSequenceNumber()); - if (emitRecord(collector, record, kinesisMessageId)) { - return; - } - } - } - } - - private boolean emitFailedRecord(SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) { - if (!failedandFetchedRecords.containsKey(kinesisMessageId)) { - return false; - } - return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId); - } - - private boolean emitRecord(SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) { - boolean result = false; - List<Object> tuple = kinesisConfig.getRecordToTupleMapper().getTuple(record); - // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail - if (tuple != null && tuple.size() > 0) { - collector.emit(tuple, kinesisMessageId); - if (!emittedPerShard.containsKey(kinesisMessageId.getShardId())) { - emittedPerShard.put(kinesisMessageId.getShardId(), new TreeSet<BigInteger>()); - } - emittedPerShard.get(kinesisMessageId.getShardId()).add(new BigInteger(record.getSequenceNumber())); - result = true; - } else { - // ack to not process the record again on restart and move on to next message - LOG.warn("Record " + record + " did not return a tuple to emit. Hence acking it"); - ack(kinesisMessageId); - } - return result; - } - - private boolean shouldCommit() { - return (System.currentTimeMillis() - lastCommitTime >= kinesisConfig.getZkInfo().getCommitIntervalMs()); - } - - private void initializeFetchedSequenceNumbers() { - for (String shardId : toEmitPerShard.keySet()) { - Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId); - // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber - if (state != null) { - Object committedSequenceNumber = state.get("committedSequenceNumber"); - LOG.info("State read is committedSequenceNumber: " + committedSequenceNumber + " shardId:" + shardId); - if (committedSequenceNumber != null) { - fetchedSequenceNumberPerShard.put(shardId, (String) committedSequenceNumber); - } - } - } - } - - private void refreshShardIteratorsForNewRecords() { - for (String shardId: toEmitPerShard.keySet()) { - refreshShardIteratorForNewRecords(shardId); - } - } - - private void refreshShardIteratorForNewRecords(String shardId) { - String shardIterator = null; - String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId); - ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null - ? kinesisConfig.getShardIteratorType() - : ShardIteratorType.AFTER_SEQUENCE_NUMBER); - // Set the shard iterator for last fetched sequence number to start from correct position in shard - shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), - shardId, - shardIteratorType, - lastFetchedSequenceNumber, - kinesisConfig.getTimestamp()); - if (shardIterator != null && !shardIterator.isEmpty()) { - LOG.warn("Refreshing shard iterator for new records for shardId " + shardId - + " with shardIterator " + shardIterator); - shardIteratorPerShard.put(shardId, shardIterator); - } - } - - private void refreshShardIteratorForFailedRecord(KinesisMessageId kinesisMessageId) { - String shardIterator = null; - // Set the shard iterator for last fetched sequence number to start from correct position in shard - shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), - kinesisMessageId.getShardId(), - ShardIteratorType.AT_SEQUENCE_NUMBER, - kinesisMessageId.getSequenceNumber(), - null); - if (shardIterator != null && !shardIterator.isEmpty()) { - LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId - + " with shardIterator " + shardIterator); - shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator); - } - } - - private Long getUncommittedRecordsCount() { - Long result = 0L; - for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) { - result += emitted.getValue().size(); - } - for (Map.Entry<String, TreeSet<BigInteger>> acked: ackedPerShard.entrySet()) { - result += acked.getValue().size(); - } - for (Map.Entry<String, TreeSet<BigInteger>> failed: failedPerShard.entrySet()) { - result += failed.getValue().size(); - } - LOG.debug("Returning uncommittedRecordsCount as {}", result); - return result; - } - - private boolean shouldFetchNewRecords() { - // check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more - boolean fetchRecords = true; - for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) { - if (!entry.getValue().isEmpty()) { - fetchRecords = false; - break; - } - } - return fetchRecords; - } - -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java deleted file mode 100644 index cf9295a78..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import java.util.Map; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; - -public class KinesisSpout extends BaseRichSpout { - - private final KinesisConfig kinesisConfig; - private transient KinesisRecordsManager kinesisRecordsManager; - private transient SpoutOutputCollector collector; - - public KinesisSpout(KinesisConfig kinesisConfig) { - this.kinesisConfig = kinesisConfig; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(kinesisConfig.getRecordToTupleMapper().getOutputFields()); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return super.getComponentConfiguration(); - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig); - kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size()); - } - - @Override - public void close() { - kinesisRecordsManager.close(); - } - - @Override - public void activate() { - kinesisRecordsManager.activate(); - } - - @Override - public void deactivate() { - kinesisRecordsManager.deactivate(); - } - - @Override - public void ack(Object msgId) { - kinesisRecordsManager.ack((KinesisMessageId) msgId); - } - - @Override - public void fail(Object msgId) { - kinesisRecordsManager.fail((KinesisMessageId) msgId); - } - - @Override - public void nextTuple() { - kinesisRecordsManager.next(collector); - } -} - - diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java deleted file mode 100644 index bee9e80be..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import com.amazonaws.services.kinesis.model.Record; - -import java.util.List; - -import org.apache.storm.tuple.Fields; - -public interface RecordToTupleMapper { - /** - * Retrieve the names of fields. - * @return names of fields in the emitted tuple - */ - Fields getOutputFields(); - - /** - * Retrieve the tuple. - * @param record kinesis record - * @return storm tuple to be emitted for this record, null if no tuple should be emitted - */ - List<Object> getTuple(Record record); -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java deleted file mode 100644 index ca982538d..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import java.nio.charset.Charset; -import java.util.Map; -import net.minidev.json.JSONValue; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; -import org.apache.zookeeper.CreateMode; - -class ZkConnection { - - private final ZkInfo zkInfo; - private CuratorFramework curatorFramework; - - ZkConnection(ZkInfo zkInfo) { - this.zkInfo = zkInfo; - } - - void initialize() { - curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), - zkInfo.getSessionTimeoutMs(), - zkInfo.getConnectionTimeoutMs(), - new RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs())); - curatorFramework.start(); - } - - void commitState(String stream, String shardId, Map<Object, Object> state) { - byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8")); - try { - String path = getZkPath(stream, shardId); - if (curatorFramework.checkExists().forPath(path) == null) { - curatorFramework.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, bytes); - } else { - curatorFramework.setData().forPath(path, bytes); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - Map<Object, Object> readState(String stream, String shardId) { - try { - String path = getZkPath(stream, shardId); - Map<Object, Object> state = null; - byte[] b = null; - if (curatorFramework.checkExists().forPath(path) != null) { - b = curatorFramework.getData().forPath(path); - } - if (b != null) { - state = (Map<Object, Object>) JSONValue.parseWithException(new String(b, "UTF-8")); - } - return state; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - void shutdown() { - curatorFramework.close(); - } - - private String getZkPath(String stream, String shardId) { - String path = ""; - if (!zkInfo.getZkNode().startsWith("/")) { - path += "/"; - } - path += zkInfo.getZkNode(); - if (!zkInfo.getZkNode().endsWith("/")) { - path += "/"; - } - path += (stream + "/" + shardId); - return path; - } -} diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java deleted file mode 100644 index 35e289065..000000000 --- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kinesis.spout; - -import java.io.Serializable; - -public class ZkInfo implements Serializable { - // comma separated list of zk connect strings to connect to zookeeper e.g. localhost:2181 - private final String zkUrl; - // zk node under which to commit the sequence number of messages. e.g. /committed_sequence_numbers - private final String zkNode; - // zk session timeout in milliseconds - private final Integer sessionTimeoutMs; - // zk connection timeout in milliseconds - private final Integer connectionTimeoutMs; - // interval at which to commit offsets to zk in milliseconds - private final Long commitIntervalMs; - // number of retry attempts for zk - private final Integer retryAttempts; - // time to sleep between retries in milliseconds - private final Integer retryIntervalMs; - - public ZkInfo(String zkUrl, - String zkNode, - Integer sessionTimeoutMs, - Integer connectionTimeoutMs, - Long commitIntervalMs, - Integer retryAttempts, - Integer retryIntervalMs) { - this.zkUrl = zkUrl; - this.zkNode = zkNode; - this.sessionTimeoutMs = sessionTimeoutMs; - this.connectionTimeoutMs = connectionTimeoutMs; - this.commitIntervalMs = commitIntervalMs; - this.retryAttempts = retryAttempts; - this.retryIntervalMs = retryIntervalMs; - validate(); - } - - public String getZkUrl() { - return zkUrl; - } - - public String getZkNode() { - return zkNode; - } - - public Integer getSessionTimeoutMs() { - return sessionTimeoutMs; - } - - public Integer getConnectionTimeoutMs() { - return connectionTimeoutMs; - } - - public Long getCommitIntervalMs() { - return commitIntervalMs; - } - - public Integer getRetryAttempts() { - return retryAttempts; - } - - public Integer getRetryIntervalMs() { - return retryIntervalMs; - } - - private void validate() { - - if (zkUrl == null || zkUrl.length() < 1) { - throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper"); - } - if (zkNode == null || zkNode.length() < 1) { - throw new IllegalArgumentException("zkNode must be specified"); - } - checkPositive(sessionTimeoutMs, "sessionTimeoutMs"); - checkPositive(connectionTimeoutMs, "connectionTimeoutMs"); - checkPositive(commitIntervalMs, "commitIntervalMs"); - checkPositive(retryAttempts, "retryAttempts"); - checkPositive(retryIntervalMs, "retryIntervalMs"); - } - - private void checkPositive(Integer argument, String name) { - if (argument == null && argument <= 0) { - throw new IllegalArgumentException(name + " must be positive"); - } - } - - private void checkPositive(Long argument, String name) { - if (argument == null && argument <= 0) { - throw new IllegalArgumentException(name + " must be positive"); - } - } - - @Override - public String toString() { - return "ZkInfo{" - + "zkUrl='" + zkUrl + '\'' - + ", zkNode='" + zkNode + '\'' - + ", sessionTimeoutMs=" + sessionTimeoutMs - + ", connectionTimeoutMs=" + connectionTimeoutMs - + ", commitIntervalMs=" + commitIntervalMs - + ", retryAttempts=" + retryAttempts - + ", retryIntervalMs=" + retryIntervalMs - + '}'; - } - -} diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java deleted file mode 100644 index 7701efdaf..000000000 --- a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kinesis.spout.test; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class KinesisBoltTest extends BaseRichBolt { - protected static final Logger LOG = LoggerFactory.getLogger(KinesisBoltTest.class); - private transient OutputCollector collector; - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - LOG.info("input = [" + input + "]"); - collector.ack(input); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } -} diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java deleted file mode 100644 index 7028e3ec0..000000000 --- a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kinesis.spout.test; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.AlreadyAliveException; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.InvalidTopologyException; -import org.apache.storm.kinesis.spout.CredentialsProviderChain; -import org.apache.storm.kinesis.spout.ExponentialBackoffRetrier; -import org.apache.storm.kinesis.spout.KinesisConfig; -import org.apache.storm.kinesis.spout.KinesisConnectionInfo; -import org.apache.storm.kinesis.spout.KinesisSpout; -import org.apache.storm.kinesis.spout.RecordToTupleMapper; -import org.apache.storm.kinesis.spout.ZkInfo; -import org.apache.storm.topology.TopologyBuilder; - -import java.util.Date; - -public class KinesisSpoutTopology { - public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { - String topologyName = args[0]; - RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper(); - KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2, - 1000); - ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000); - KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON, - recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L); - KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig); - TopologyBuilder topologyBuilder = new TopologyBuilder(); - topologyBuilder.setSpout("spout", kinesisSpout, 3); - topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout"); - Config topologyConfig = new Config(); - topologyConfig.setDebug(true); - topologyConfig.setNumWorkers(3); - StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology()); - } -} diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java deleted file mode 100644 index 2879c3e7c..000000000 --- a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kinesis.spout.test; - -import com.amazonaws.services.kinesis.model.Record; -import org.apache.storm.kinesis.spout.RecordToTupleMapper; -import org.apache.storm.tuple.Fields; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.util.ArrayList; -import java.util.List; - -public class TestRecordToTupleMapper implements RecordToTupleMapper, Serializable { - private static final Logger LOG = LoggerFactory.getLogger(TestRecordToTupleMapper.class); - @Override - public Fields getOutputFields() { - return new Fields("partitionKey", "sequenceNumber", "data"); - } - - @Override - public List<Object> getTuple(Record record) { - CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); - List<Object> tuple = new ArrayList<>(); - tuple.add(record.getPartitionKey()); - tuple.add(record.getSequenceNumber()); - try { - String data = decoder.decode(record.getData()).toString(); - LOG.info("data is " + data); - tuple.add(data); - } catch (CharacterCodingException e) { - e.printStackTrace(); - LOG.warn("Exception occured. Emitting tuple with empty string data", e); - tuple.add(""); - } - LOG.info("Tuple from record is " + tuple); - return tuple; - } -} diff --git a/pom.xml b/pom.xml index 160afc6ac..b460324d4 100644 --- a/pom.xml +++ b/pom.xml @@ -501,7 +501,6 @@ <module>external/storm-kafka-migration</module> <module>external/storm-opentsdb</module> <module>external/storm-kafka-monitor</module> - <module>external/storm-kinesis</module> <module>external/storm-jms</module> <module>external/storm-pmml</module> <module>external/storm-rocketmq</module>
