This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new 781e167 Add support for stream partition offsets (#6402)
781e167 is described below
commit 781e1676d867816232d5715fe4ccc4e14e0da1fb
Author: Kartik Khare <[email protected]>
AuthorDate: Tue Jan 5 01:19:28 2021 +0530
Add support for stream partition offsets (#6402)
---
.../plugin/stream/kinesis/KinesisCheckpoint.java | 3 +-
.../plugin/stream/kinesis/KinesisConsumer.java | 10 +++++--
.../stream/kinesis/KinesisConsumerFactory.java | 5 ++++
.../stream/kinesis/KinesisMsgOffsetFactory.java | 32 ++++++++++++++++++++++
.../plugin/stream/kinesis/KinesisRecordsBatch.java | 15 ++++++----
5 files changed, 56 insertions(+), 9 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 1b8f86e..d42f899 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -23,10 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.JsonUtils;
-public class KinesisCheckpoint implements Checkpoint {
+public class KinesisCheckpoint implements StreamPartitionMsgOffset {
private Map<String, String> _shardToStartSequenceMap;
public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 8a24208..8ed3de7 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -80,6 +80,7 @@ public class KinesisConsumer extends KinesisConnectionHandler
implements Partiti
createConnection();
}
+ //TODO: iterate upon all the shardIds in the map
Map.Entry<String, String> next =
kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next();
String shardIterator = getShardIterator(next.getKey(), next.getValue());
@@ -125,7 +126,7 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
nextStartSequenceNumber = recordList.get(recordList.size() -
1).sequenceNumber();
}
- return new KinesisRecordsBatch(recordList);
+ return new KinesisRecordsBatch(recordList, next.getKey());
} catch (ProvisionedThroughputExceededException e) {
LOG.warn("The request rate for the stream is too high", e);
return handleException(kinesisStartCheckpoint, recordList);
@@ -147,13 +148,16 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
}
private KinesisRecordsBatch handleException(KinesisCheckpoint start,
List<Record> recordList) {
+ String shardId =
start.getShardToStartSequenceMap().entrySet().iterator().next().getKey();
+
if (recordList.size() > 0) {
String nextStartSequenceNumber = recordList.get(recordList.size() -
1).sequenceNumber();
Map<String, String> newCheckpoint = new
HashMap<>(start.getShardToStartSequenceMap());
newCheckpoint.put(newCheckpoint.keySet().iterator().next(),
nextStartSequenceNumber);
- return new KinesisRecordsBatch(recordList);
+
+ return new KinesisRecordsBatch(recordList, shardId);
} else {
- return new KinesisRecordsBatch(recordList);
+ return new KinesisRecordsBatch(recordList, shardId);
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index aa90812..631f240 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -25,6 +25,7 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
public class KinesisConsumerFactory extends StreamConsumerFactory {
@@ -55,4 +56,8 @@ public class KinesisConsumerFactory extends
StreamConsumerFactory {
return new KinesisConsumer(new KinesisConfig(_streamConfig));
}
+ @Override
+ public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
+ return new KinesisMsgOffsetFactory();
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
new file mode 100644
index 0000000..f234bae
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
@@ -0,0 +1,32 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.io.IOException;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+
+
+public class KinesisMsgOffsetFactory implements
StreamPartitionMsgOffsetFactory {
+
+ KinesisConfig _kinesisConfig;
+
+ @Override
+ public void init(StreamConfig streamConfig) {
+ _kinesisConfig = new KinesisConfig(streamConfig);
+ }
+
+ @Override
+ public StreamPartitionMsgOffset create(String offsetStr) {
+ try {
+ return new KinesisCheckpoint(offsetStr);
+ }catch (IOException e){
+ return null;
+ }
+ }
+
+ @Override
+ public StreamPartitionMsgOffset create(StreamPartitionMsgOffset other) {
+ return new KinesisCheckpoint(((KinesisCheckpoint)
other).getShardToStartSequenceMap());
+ }
+
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index 04bf4e6..fb4bfb3 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -27,9 +29,11 @@ import software.amazon.awssdk.services.kinesis.model.Record;
public class KinesisRecordsBatch implements MessageBatch<byte[]> {
private List<Record> _recordList;
+ private String _shardId;
- public KinesisRecordsBatch(List<Record> recordList) {
+ public KinesisRecordsBatch(List<Record> recordList, String shardId) {
_recordList = recordList;
+ _shardId = shardId;
}
@Override
@@ -39,13 +43,12 @@ public class KinesisRecordsBatch implements
MessageBatch<byte[]> {
@Override
public byte[] getMessageAtIndex(int index) {
- return _recordList.get(index).data().asByteArray();
+ return _recordList.get(index).data().asByteBuffer().array();
}
@Override
public int getMessageOffsetAtIndex(int index) {
- //TODO: Doesn't translate to offset. Needs to be replaced.
- return _recordList.get(index).hashCode();
+ return _recordList.get(index).data().asByteBuffer().arrayOffset();
}
@Override
@@ -60,7 +63,9 @@ public class KinesisRecordsBatch implements
MessageBatch<byte[]> {
@Override
public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int
index) {
- throw new UnsupportedOperationException();
+ Map<String, String> shardToSequenceMap = new HashMap<>();
+ shardToSequenceMap.put(_shardId, _recordList.get(index).sequenceNumber());
+ return new KinesisCheckpoint(shardToSequenceMap);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]