lnbest0707-uber commented on code in PR #16494:
URL: https://github.com/apache/pinot/pull/16494#discussion_r2292380542
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements
Comparable<LLCSegmentName> {
private final int _sequenceNumber;
private final String _creationTime;
private final String _segmentName;
+ @Nullable
+ private final String _topicName;
public LLCSegmentName(String segmentName) {
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
- Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name:
%s", segmentName);
+ // Validate the segment name format should have 4 or 5 parts:
+ // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+ // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+ Preconditions.checkArgument(
+ parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name:
%s", segmentName);
_tableName = parts[0];
- _partitionGroupId = Integer.parseInt(parts[1]);
- _sequenceNumber = Integer.parseInt(parts[2]);
- _creationTime = parts[3];
+ if (parts.length == 4) {
+ _topicName = null;
+ _partitionGroupId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
+ _creationTime = parts[3];
+ } else {
+ _topicName = parts[1];
+ _partitionGroupId = Integer.parseInt(parts[2]);
+ _sequenceNumber = Integer.parseInt(parts[3]);
+ _creationTime = parts[4];
+ }
_segmentName = segmentName;
}
public LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
+ this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+ }
+
+ public LLCSegmentName(
+ String tableName, @Nullable String topicName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table
name: %s", tableName);
+ Preconditions.checkArgument(topicName == null ||
!topicName.contains(SEPARATOR),
Review Comment:
good idea
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -76,13 +118,7 @@ public static LLCSegmentName of(String segmentName) {
* Returns whether the given segment name represents an LLC segment.
*/
public static boolean isLLCSegment(String segmentName) {
- int numSeparators = 0;
- int index = 0;
- while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
- numSeparators++;
- index += 2; // SEPARATOR.length()
- }
- return numSeparators == 3;
+ return of(segmentName) != null;
Review Comment:
good point.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
+ topicName;
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
int index = i;
+ int finalPermanentTopicIndex = permanentTopicIndex;
+ // For permanent topics, we use the index of the stream config to get
the partition group consumption status.
+ // For ephemeral backfill topics, we use the topic name to filter the
partition group consumption status.
List<PartitionGroupConsumptionStatus>
topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
- .filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
- partitionGroupConsumptionStatus.getPartitionGroupId()) ==
index)
+ .filter(partitionGroupConsumptionStatus ->
_streamConfigs.get(index).isEphemeralBackfillTopic()
Review Comment:
Agree. This code is hard to follow. I've refactored it to completely split
the permanent and ephemeral.
Also this class was with only simple functionality and without UT. I've also
added extra UTs to test single/mutiple w/ and w/o ephemeral topics cases.
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -127,6 +181,11 @@ public String getSegmentName() {
public int compareTo(LLCSegmentName other) {
Preconditions.checkArgument(_tableName.equals(other._tableName),
"Cannot compare segment names from different table: %s, %s",
_segmentName, other.getSegmentName());
+ String thisTopicName = _topicName == null ? "" : _topicName;
+ String otherTopicName = other._topicName == null ? "" : other._topicName;
+ if (!thisTopicName.equals(otherTopicName)) {
+ return StringUtils.compare(_topicName, other._topicName);
+ }
Review Comment:
If we do not compare it. `table__topicA__1__xxTxxZ` might be same as
`table__1__xxTxxZ`.
Do you mean if we should treat it as table name as above?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
+ topicName;
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
int index = i;
+ int finalPermanentTopicIndex = permanentTopicIndex;
+ // For permanent topics, we use the index of the stream config to get
the partition group consumption status.
+ // For ephemeral backfill topics, we use the topic name to filter the
partition group consumption status.
List<PartitionGroupConsumptionStatus>
topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
- .filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
- partitionGroupConsumptionStatus.getPartitionGroupId()) ==
index)
+ .filter(partitionGroupConsumptionStatus ->
_streamConfigs.get(index).isEphemeralBackfillTopic()
+ ?
_streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())
+ :
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+ partitionGroupConsumptionStatus.getPartitionGroupId())
== finalPermanentTopicIndex)
.collect(Collectors.toList());
try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
StreamConsumerFactory.getUniqueClientId(clientId))) {
+ // Similarly, for ephemeral backfill topics, we create the partition
group metadata with the topic name.
_newPartitionGroupMetadataList.addAll(
streamMetadataProvider.computePartitionGroupMetadata(clientId,
- streamConfig, topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000,
+ _streamConfigs.get(i),
topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream)
.stream()
.map(metadata -> new PartitionGroupMetadata(
-
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
- index), metadata.getStartOffset()))
+
_streamConfigs.get(finalPermanentTopicIndex).isEphemeralBackfillTopic() ?
_streamConfigs.get(index)
+ .getTopicName() : "",
Review Comment:
Standardized all related component's _topicName field. We would treat empty
as null now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]