This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new 9c31f41 Use shardId's last digits as partitionGroupId
9c31f41 is described below
commit 9c31f413a48710f912a4c3b182382627e5b29c1a
Author: Neha Pawar <[email protected]>
AuthorDate: Wed Jan 6 18:21:20 2021 -0800
Use shardId's last digits as partitionGroupId
---
.../helix/core/PinotHelixResourceManager.java | 4 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 10 +++
.../kinesis/KinesisStreamMetadataProvider.java | 79 +++++++++++++++++-----
.../pinot/spi/stream/StreamMetadataProvider.java | 3 +-
4 files changed, 75 insertions(+), 21 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index af99860..6b5168f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1355,8 +1355,8 @@ public class PinotHelixResourceManager {
// (unless there are low-level segments already present)
if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore,
realtimeTableName).isEmpty()) {
PinotTableIdealStateBuilder
- .buildLowLevelRealtimeIdealStateFor(realtimeTableName,
realtimeTableConfig, idealState,
- _enableBatchMessageMode);
+
.buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager,
realtimeTableName, realtimeTableConfig,
+ idealState, _enableBatchMessageMode);
LOGGER.info("Successfully added Helix entries for low-level consumers
for {} ", realtimeTableName);
} else {
LOGGER.info("LLC is already set up for table {}, not configuring
again", realtimeTableName);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 20f79d4..d5578a3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -519,6 +519,11 @@ public class PinotLLCRealtimeSegmentManager {
PartitionGroupMetadata currentPartitionGroupMetadata =
currentGroupIdToMetadata.get(newPartitionGroupId);
if (currentPartitionGroupMetadata == null) { // not present in current
state. New partition found.
// make new segment
+ // FIXME: flushThreshold of segment is actually (configured
threshold/numPartitions)
+ // In Kinesis, with every split/merge, we get new partitions, and an
old partition gets deactivated.
+ // However, the getPartitionGroupInfo call returns ALL shards,
regardless of whether they're active or not.
+ // So our numPartitions will forever keep increasing.
+ // TODO: can the getPartitionGroupInfo return the active partitions
only, based on the checkpoints passed in current?
String newLLCSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupInfo, newSegmentCreationTimeMs,
instancePartitions, numPartitions, numReplicas);
@@ -534,6 +539,11 @@ public class PinotLLCRealtimeSegmentManager {
createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+
+ // FIXME: a new CONSUMING segment is created even if EOL for this
shard has been reached.
+ // the logic in getPartitionGroupInfo to prevent returning of EOLed
shards isn't working
+ // OPTION: Since consumer knows about it, it can pass param in
request/committingSegmentDescriptor "isEndOfShard"
+ // We can set that in metadata for validation manager to skip these
partitions
}
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index f86d06c..6c55a18 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -6,7 +6,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -16,44 +18,85 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
- private final KinesisConfig _kinesisConfig;
- private KinesisConnectionHandler _kinesisConnectionHandler;
+ private final KinesisConnectionHandler _kinesisConnectionHandler;
public KinesisStreamMetadataProvider(String clientId, KinesisConfig
kinesisConfig) {
- _kinesisConfig = kinesisConfig;
_kinesisConnectionHandler = new
KinesisConnectionHandler(kinesisConfig.getStream(),
kinesisConfig.getAwsRegion());
}
@Override
public int fetchPartitionCount(long timeoutMillis) {
- return 0;
+ throw new UnsupportedOperationException();
}
@Override
- public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria,
long timeoutMillis)
- throws TimeoutException {
- return 0;
+ public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria,
long timeoutMillis) {
+ throw new UnsupportedOperationException();
}
@Override
public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId,
StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int
timeoutMillis)
- throws TimeoutException {
- List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>();
+ throws IOException {
+
+ Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap =
+
currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId,
p -> p));
+
+ List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
List<Shard> shards = _kinesisConnectionHandler.getShards();
- for (Shard shard : shards) {
- Map<String, String> shardToSequenceNumMap = new HashMap<>();
- shardToSequenceNumMap.put(shard.shardId(),
shard.sequenceNumberRange().startingSequenceNumber());
- KinesisCheckpoint kinesisCheckpoint = new
KinesisCheckpoint(shardToSequenceNumMap);
- partitionGroupInfos
- .add(new PartitionGroupInfo(Math.abs(shard.shardId().hashCode()),
kinesisCheckpoint.serialize()));
+ for (Shard shard : shards) { // go over all shards
+ String shardId = shard.shardId();
+ int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
+ PartitionGroupMetadata currentPartitionGroupMetadata =
currentPartitionGroupMap.get(partitionGroupId);
+ KinesisCheckpoint newStartCheckpoint;
+ if (currentPartitionGroupMetadata != null) { // existing shard
+ KinesisCheckpoint currentEndCheckpoint = null;
+ try {
+ currentEndCheckpoint = new
KinesisCheckpoint(currentPartitionGroupMetadata.getEndCheckpoint());
+ } catch (Exception e) {
+ // ignore. No end checkpoint yet for IN_PROGRESS segment
+ }
+ if (currentEndCheckpoint != null) { // end checkpoint available i.e.
committing segment
+ String endingSequenceNumber =
shard.sequenceNumberRange().endingSequenceNumber();
+ if (endingSequenceNumber != null) { // shard has ended
+ // FIXME: this logic is not working
+ // was expecting sequenceNumOfLastMsgInShard ==
endSequenceNumOfShard.
+ // But it is much lesser than the endSeqNumOfShard
+ Map<String, String> shardToSequenceNumberMap = new HashMap<>();
+ shardToSequenceNumberMap.put(shardId, endingSequenceNumber);
+ KinesisCheckpoint shardEndCheckpoint = new
KinesisCheckpoint(shardToSequenceNumberMap);
+ if (currentEndCheckpoint.compareTo(shardEndCheckpoint) >= 0) {
+ // shard has ended AND we have reached the end checkpoint.
+ // skip this partition group in the result
+ continue;
+ }
+ }
+ newStartCheckpoint = currentEndCheckpoint;
+ } else {
+ newStartCheckpoint = new
KinesisCheckpoint(currentPartitionGroupMetadata.getStartCheckpoint());
+ }
+ } else { // new shard
+ Map<String, String> shardToSequenceNumberMap = new HashMap<>();
+ shardToSequenceNumberMap.put(shardId,
shard.sequenceNumberRange().startingSequenceNumber());
+ newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
+ }
+ newPartitionGroupInfos
+ .add(new PartitionGroupInfo(partitionGroupId,
newStartCheckpoint.serialize()));
}
- return partitionGroupInfos;
+ return newPartitionGroupInfos;
+ }
+
+ /**
+ * Converts a shardId string to a partitionGroupId integer by parsing the
digits of the shardId
+ * e.g. "shardId-000000000001" becomes 1
+ */
+ private int getPartitionGroupIdFromShardId(String shardId) {
+ String shardIdNum =
StringUtils.stripStart(StringUtils.removeStart(shardId, "shardId-"), "0");
+ return shardIdNum.isEmpty() ? 0 : Integer.parseInt(shardIdNum);
}
@Override
- public void close()
- throws IOException {
+ public void close() {
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index c64f710..be2e819 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.stream;
import java.io.Closeable;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -64,7 +65,7 @@ public interface StreamMetadataProvider extends Closeable {
*/
default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId,
StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int
timeoutMillis)
- throws TimeoutException {
+ throws TimeoutException, IOException {
int partitionCount = fetchPartitionCount(timeoutMillis);
List<PartitionGroupInfo> newPartitionGroupInfoList = new
ArrayList<>(partitionCount);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]