This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 782d14d  Add reason of creation to StreamMetadataProvider name. (#6862)
782d14d is described below

commit 782d14dc3fb56256339cf5071d9fbdc41b7e6370
Author: Jiapeng Tao <[email protected]>
AuthorDate: Wed Apr 28 20:52:34 2021 -0700

    Add reason of creation to StreamMetadataProvider name. (#6862)
---
 .../helix/core/PinotTableIdealStateBuilder.java         |  5 +++--
 .../core/realtime/PinotLLCRealtimeSegmentManager.java   | 17 +++++++++++------
 .../realtime/PinotLLCRealtimeSegmentManagerTest.java    |  9 +++++----
 .../pinot/spi/stream/PartitionGroupMetadataFetcher.java | 17 +++++++++++++++--
 4 files changed, 34 insertions(+), 14 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 73b8e5b..344ade4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -149,11 +149,12 @@ public class PinotTableIdealStateBuilder {
    * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current partition groups.
    *                                          The size of this list is equal 
to the number of partition groups,
    *                                          and is created using the latest 
segment zk metadata.
+   * @param reason the reason to get partition group metadata
    */
   public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, String reason) {
     PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
-        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
+        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList, reason);
     try {
       
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
       return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2e1dcc3..b8eca7c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -75,6 +75,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -257,7 +258,8 @@ public class PinotLLCRealtimeSegmentManager {
     PartitionLevelStreamConfig streamConfig =
         new PartitionLevelStreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList = 
getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfig, 
Collections.emptyList(), 
PartitionGroupMetadataFetcher.Reason.TABLE_CREATION.name());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
@@ -502,7 +504,8 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Fetches new partition groups, given current list of {@link 
PartitionGroupConsumptionStatus}.
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
+        getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList,
+            PartitionGroupMetadataFetcher.Reason.SEGMENT_COMMITMENT.name() + 
"-" + committingSegmentPartitionGroupId);
     Set<Integer> newPartitionGroupSet =
         
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
@@ -705,9 +708,9 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
-      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
+      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, String reason) {
     return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
-        currentPartitionGroupConsumptionStatusList);
+        currentPartitionGroupConsumptionStatusList, reason);
   }
 
   /**
@@ -813,7 +816,8 @@ public class PinotLLCRealtimeSegmentManager {
         List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
             getPartitionGroupConsumptionStatusList(idealState, streamConfig);
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-            getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
+            getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList,
+                
PartitionGroupMetadataFetcher.Reason.PERIODIC_SEGMENT_VALIDATION.name());
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList);
 
       } else {
@@ -1134,7 +1138,8 @@ public class PinotLLCRealtimeSegmentManager {
     StreamConfig smallestOffsetCriteriaStreamConfig =
         new StreamConfig(streamConfig.getTableNameWithType(), 
streamConfigMapWithSmallestOffsetCriteria);
     List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
-        getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, 
Collections.emptyList());
+        getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, 
Collections.emptyList(),
+            
PartitionGroupMetadataFetcher.Reason.PERIODIC_PARTITION_GROUP_SMALLEST_OFFSET_FETCHER.name()
 + "-" + partitionGroupId);
     StreamPartitionMsgOffset partitionStartOffset = null;
     for (PartitionGroupMetadata info : 
smallestOffsetCriteriaPartitionGroupMetadata) {
       if (info.getPartitionGroupId() == partitionGroupId) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index c3d58da..3db14e7 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -254,7 +254,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     // committing segment's partitionGroupId no longer in the 
newPartitionGroupMetadataList
     List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, 
Collections.emptyList());
+        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, 
Collections.emptyList(), "TEST_TABLE_CREATION");
     partitionGroupMetadataListWithout0.remove(0);
     segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout0;
 
@@ -565,7 +565,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
      */
     // 1 reached end of shard.
     List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, 
Collections.emptyList());
+        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, 
Collections.emptyList(),
+            "TEST_TABLE_CREATION");
     partitionGroupMetadataListWithout1.remove(1);
     segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout1;
     // noop
@@ -962,7 +963,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     public void ensureAllPartitionsConsuming() {
       ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()));
+          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList(), "TEST_PERIODIC_SEGMENT_VALIDATION"));
     }
 
     @Override
@@ -1029,7 +1030,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     @Override
     List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
-        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
+        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, String reason) {
       if (_partitionGroupMetadataList != null) {
         return _partitionGroupMetadataList;
       } else {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 6cc74ce..8439462 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
 
 import java.util.List;
 import java.util.concurrent.Callable;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,18 +33,27 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
 
+  public enum Reason {
+    TABLE_CREATION, SEGMENT_COMMITMENT, PERIODIC_SEGMENT_VALIDATION, 
PERIODIC_PARTITION_GROUP_SMALLEST_OFFSET_FETCHER
+  }
+
   private List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
   private final StreamConfig _streamConfig;
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
   private final StreamConsumerFactory _streamConsumerFactory;
   private Exception _exception;
   private final String _topicName;
+  private final String _reason;
 
-  public PartitionGroupMetadataFetcher(StreamConfig streamConfig, 
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) {
+  public PartitionGroupMetadataFetcher(
+      StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList,
+      String reason) {
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     _topicName = streamConfig.getTopicName();
     _streamConfig = streamConfig;
     _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
+    _reason = reason;
   }
 
   public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
@@ -61,7 +71,10 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
   @Override
   public Boolean call()
       throws Exception {
-    String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + 
"-" + _topicName;
+    String clientId = PartitionGroupMetadataFetcher.class.getSimpleName()
+        + "-" + _topicName
+        + "-" + 
TableNameBuilder.extractRawTableName(_streamConfig.getTableNameWithType())
+        + "-" + _reason;
     try (
         StreamMetadataProvider streamMetadataProvider = 
_streamConsumerFactory.createStreamMetadataProvider(clientId)) {
       _newPartitionGroupMetadataList = streamMetadataProvider

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to