Jackie-Jiang commented on code in PR #16494:
URL: https://github.com/apache/pinot/pull/16494#discussion_r2292310357


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements 
Comparable<LLCSegmentName> {
   private final int _sequenceNumber;
   private final String _creationTime;
   private final String _segmentName;
+  @Nullable
+  private final String _topicName;
 
   public LLCSegmentName(String segmentName) {
     String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
-    Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: 
%s", segmentName);
+    // Validate the segment name format should have 4 or 5 parts:
+    // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+    // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+    Preconditions.checkArgument(
+        parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: 
%s", segmentName);
     _tableName = parts[0];
-    _partitionGroupId = Integer.parseInt(parts[1]);
-    _sequenceNumber = Integer.parseInt(parts[2]);
-    _creationTime = parts[3];
+    if (parts.length == 4) {
+      _topicName = null;
+      _partitionGroupId = Integer.parseInt(parts[1]);
+      _sequenceNumber = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+    } else {
+      _topicName = parts[1];
+      _partitionGroupId = Integer.parseInt(parts[2]);
+      _sequenceNumber = Integer.parseInt(parts[3]);
+      _creationTime = parts[4];
+    }
     _segmentName = segmentName;
   }
 
   public LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
+    this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+  }
+
+  public LLCSegmentName(
+      String tableName, @Nullable String topicName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
     Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table 
name: %s", tableName);
+    Preconditions.checkArgument(topicName == null || 
!topicName.contains(SEPARATOR),

Review Comment:
   Consider also checking if it is not empty string



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements 
Comparable<LLCSegmentName> {
   private final int _sequenceNumber;
   private final String _creationTime;
   private final String _segmentName;
+  @Nullable
+  private final String _topicName;
 
   public LLCSegmentName(String segmentName) {
     String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
-    Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: 
%s", segmentName);
+    // Validate the segment name format should have 4 or 5 parts:
+    // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+    // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+    Preconditions.checkArgument(
+        parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: 
%s", segmentName);
     _tableName = parts[0];
-    _partitionGroupId = Integer.parseInt(parts[1]);
-    _sequenceNumber = Integer.parseInt(parts[2]);
-    _creationTime = parts[3];
+    if (parts.length == 4) {
+      _topicName = null;
+      _partitionGroupId = Integer.parseInt(parts[1]);
+      _sequenceNumber = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+    } else {
+      _topicName = parts[1];
+      _partitionGroupId = Integer.parseInt(parts[2]);
+      _sequenceNumber = Integer.parseInt(parts[3]);
+      _creationTime = parts[4];
+    }
     _segmentName = segmentName;
   }
 
   public LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
+    this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+  }
+
+  public LLCSegmentName(
+      String tableName, @Nullable String topicName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
     Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table 
name: %s", tableName);
+    Preconditions.checkArgument(topicName == null || 
!topicName.contains(SEPARATOR),
+        "Illegal topic name: %s", tableName);

Review Comment:
   ```suggestion
           "Illegal topic name: %s", topicName);
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements 
Comparable<LLCSegmentName> {
   private final int _sequenceNumber;
   private final String _creationTime;
   private final String _segmentName;
+  @Nullable
+  private final String _topicName;
 
   public LLCSegmentName(String segmentName) {
     String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
-    Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: 
%s", segmentName);
+    // Validate the segment name format should have 4 or 5 parts:
+    // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+    // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+    Preconditions.checkArgument(
+        parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: 
%s", segmentName);
     _tableName = parts[0];
-    _partitionGroupId = Integer.parseInt(parts[1]);
-    _sequenceNumber = Integer.parseInt(parts[2]);
-    _creationTime = parts[3];
+    if (parts.length == 4) {
+      _topicName = null;
+      _partitionGroupId = Integer.parseInt(parts[1]);
+      _sequenceNumber = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+    } else {
+      _topicName = parts[1];
+      _partitionGroupId = Integer.parseInt(parts[2]);
+      _sequenceNumber = Integer.parseInt(parts[3]);
+      _creationTime = parts[4];
+    }
     _segmentName = segmentName;
   }
 
   public LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
+    this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+  }
+
+  public LLCSegmentName(
+      String tableName, @Nullable String topicName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
     Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table 
name: %s", tableName);
+    Preconditions.checkArgument(topicName == null || 
!topicName.contains(SEPARATOR),
+        "Illegal topic name: %s", tableName);
     _tableName = tableName;
+    _topicName = topicName;
     _partitionGroupId = partitionGroupId;
     _sequenceNumber = sequenceNumber;
     // ISO8601 date: 20160120T1234Z
     _creationTime = DATE_FORMATTER.print(msSinceEpoch);
-    _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + 
sequenceNumber + SEPARATOR + _creationTime;
+    if (topicName == null) {
+      _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + 
sequenceNumber + SEPARATOR + _creationTime;
+    } else {
+      _segmentName =
+          tableName + SEPARATOR + topicName + SEPARATOR + partitionGroupId + 
SEPARATOR + sequenceNumber + SEPARATOR
+              + _creationTime;
+    }
+  }
+
+  private LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, String creationTime,

Review Comment:
   Remove this constructor



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements 
Comparable<LLCSegmentName> {
   private final int _sequenceNumber;
   private final String _creationTime;
   private final String _segmentName;
+  @Nullable
+  private final String _topicName;
 
   public LLCSegmentName(String segmentName) {
     String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
-    Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: 
%s", segmentName);
+    // Validate the segment name format should have 4 or 5 parts:
+    // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+    // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+    Preconditions.checkArgument(
+        parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: 
%s", segmentName);
     _tableName = parts[0];
-    _partitionGroupId = Integer.parseInt(parts[1]);
-    _sequenceNumber = Integer.parseInt(parts[2]);
-    _creationTime = parts[3];
+    if (parts.length == 4) {
+      _topicName = null;
+      _partitionGroupId = Integer.parseInt(parts[1]);
+      _sequenceNumber = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+    } else {
+      _topicName = parts[1];
+      _partitionGroupId = Integer.parseInt(parts[2]);
+      _sequenceNumber = Integer.parseInt(parts[3]);
+      _creationTime = parts[4];
+    }
     _segmentName = segmentName;
   }
 
   public LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
+    this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+  }
+
+  public LLCSegmentName(

Review Comment:
   (minor, format) The changes doesn't follow [Pinot 
Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide).
 Can you set it up and reformat the changes



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -76,13 +118,7 @@ public static LLCSegmentName of(String segmentName) {
    * Returns whether the given segment name represents an LLC segment.
    */
   public static boolean isLLCSegment(String segmentName) {
-    int numSeparators = 0;
-    int index = 0;
-    while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
-      numSeparators++;
-      index += 2; // SEPARATOR.length()
-    }
-    return numSeparators == 3;
+    return of(segmentName) != null;

Review Comment:
   Let's only change the last check as `numSeparators == 3 || numSeparators == 
4` as it is more efficient



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2776,54 +2842,4 @@ public List<String> getCommittingSegments(String 
realtimeTableName) {
   private List<String> getCommittingSegments(String realtimeTableName, 
Collection<String> segmentsToCheck) {
     return getCommittingSegments(realtimeTableName, segmentsToCheck, 
_helixResourceManager::getSegmentZKMetadata);
   }
-
-  public static List<String> getCommittingSegments(String realtimeTableName,

Review Comment:
   Several methods/comments are moved. What's the reason? Seems these methods 
fit more in their original position



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -127,6 +181,11 @@ public String getSegmentName() {
   public int compareTo(LLCSegmentName other) {
     Preconditions.checkArgument(_tableName.equals(other._tableName),
         "Cannot compare segment names from different table: %s, %s", 
_segmentName, other.getSegmentName());
+    String thisTopicName = _topicName == null ? "" : _topicName;
+    String otherTopicName = other._topicName == null ? "" : other._topicName;
+    if (!thisTopicName.equals(otherTopicName)) {
+      return StringUtils.compare(_topicName, other._topicName);
+    }

Review Comment:
   Do you want to compare topic name here? What is the side effect of comparing 
it?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -65,6 +103,10 @@ public LLCSegmentName(String tableName, int 
partitionGroupId, int sequenceNumber
    */
   @Nullable
   public static LLCSegmentName of(String segmentName) {
+    String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+    if (parts.length < 4 || parts.length > 5) {
+      return null;
+    }

Review Comment:
   Let's not modify this. This is adding overhead



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
               + topicName;
       StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
       int index = i;
+      int finalPermanentTopicIndex = permanentTopicIndex;
+      // For permanent topics, we use the index of the stream config to get 
the partition group consumption status.
+      // For ephemeral backfill topics, we use the topic name to filter the 
partition group consumption status.
       List<PartitionGroupConsumptionStatus> 
topicPartitionGroupConsumptionStatusList =
           _partitionGroupConsumptionStatusList.stream()
-              .filter(partitionGroupConsumptionStatus -> 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
-                  partitionGroupConsumptionStatus.getPartitionGroupId()) == 
index)
+              .filter(partitionGroupConsumptionStatus -> 
_streamConfigs.get(index).isEphemeralBackfillTopic()

Review Comment:
   Can we move this check to 
`PinotLLCRealtimeSegmentManager.setupNewPartitionGroup()`? Currently it is 
deeply nested, which makes it very hard to follow (I missed it several times, 
and in the end find this rewrite)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1833,29 +1896,31 @@ private StreamPartitionMsgOffset 
selectStartOffset(OffsetCriteria offsetCriteria
   }
 
   private LLCSegmentName getNextLLCSegmentName(LLCSegmentName 
lastLLCSegmentName, long creationTimeMs) {
-    return new LLCSegmentName(lastLLCSegmentName.getTableName(), 
lastLLCSegmentName.getPartitionGroupId(),
-        lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
+    return new LLCSegmentName(lastLLCSegmentName.getTableName(), 
lastLLCSegmentName.getTopicName(),
+        lastLLCSegmentName.getPartitionGroupId(), 
lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
   }
 
   /**
    * Sets up a new partition group.
    * <p>Persists the ZK metadata for the first CONSUMING segment, and returns 
the segment name.
    */
-  private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig 
streamConfig,
+  private String setupNewPartitionGroup(TableConfig tableConfig, 
List<StreamConfig> streamConfigs,

Review Comment:
   I don't follow this change. What does it mean to setup a new partition group 
on multiple `StreamConfig`? Which `StreamConfig` should we reference when 
setting it up?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
               + topicName;
       StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
       int index = i;
+      int finalPermanentTopicIndex = permanentTopicIndex;
+      // For permanent topics, we use the index of the stream config to get 
the partition group consumption status.
+      // For ephemeral backfill topics, we use the topic name to filter the 
partition group consumption status.
       List<PartitionGroupConsumptionStatus> 
topicPartitionGroupConsumptionStatusList =
           _partitionGroupConsumptionStatusList.stream()
-              .filter(partitionGroupConsumptionStatus -> 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
-                  partitionGroupConsumptionStatus.getPartitionGroupId()) == 
index)
+              .filter(partitionGroupConsumptionStatus -> 
_streamConfigs.get(index).isEphemeralBackfillTopic()
+                  ? 
_streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())
+                  : 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+                      partitionGroupConsumptionStatus.getPartitionGroupId()) 
== finalPermanentTopicIndex)
               .collect(Collectors.toList());
       try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
           StreamConsumerFactory.getUniqueClientId(clientId))) {
+        // Similarly, for ephemeral backfill topics, we create the partition 
group metadata with the topic name.
         _newPartitionGroupMetadataList.addAll(
             streamMetadataProvider.computePartitionGroupMetadata(clientId,
-                    streamConfig, topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000,
+                    _streamConfigs.get(i), 
topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
                     _forceGetOffsetFromStream)
                 .stream()
                 .map(metadata -> new PartitionGroupMetadata(
-                    
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
-                        index), metadata.getStartOffset()))
+                    
_streamConfigs.get(finalPermanentTopicIndex).isEphemeralBackfillTopic() ? 
_streamConfigs.get(index)
+                        .getTopicName() : "",

Review Comment:
   Is this a bug? Will it break if we use `""` as the topic name? Is there a 
test for this?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
               + topicName;
       StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
       int index = i;
+      int finalPermanentTopicIndex = permanentTopicIndex;
+      // For permanent topics, we use the index of the stream config to get 
the partition group consumption status.
+      // For ephemeral backfill topics, we use the topic name to filter the 
partition group consumption status.
       List<PartitionGroupConsumptionStatus> 
topicPartitionGroupConsumptionStatusList =
           _partitionGroupConsumptionStatusList.stream()
-              .filter(partitionGroupConsumptionStatus -> 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
-                  partitionGroupConsumptionStatus.getPartitionGroupId()) == 
index)
+              .filter(partitionGroupConsumptionStatus -> 
_streamConfigs.get(index).isEphemeralBackfillTopic()
+                  ? 
_streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())

Review Comment:
   Do we need this check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to