This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 782d14d Add reason of creation to StreamMetadataProvider name. (#6862)
782d14d is described below
commit 782d14dc3fb56256339cf5071d9fbdc41b7e6370
Author: Jiapeng Tao <[email protected]>
AuthorDate: Wed Apr 28 20:52:34 2021 -0700
Add reason of creation to StreamMetadataProvider name. (#6862)
---
.../helix/core/PinotTableIdealStateBuilder.java | 5 +++--
.../core/realtime/PinotLLCRealtimeSegmentManager.java | 17 +++++++++++------
.../realtime/PinotLLCRealtimeSegmentManagerTest.java | 9 +++++----
.../pinot/spi/stream/PartitionGroupMetadataFetcher.java | 17 +++++++++++++++--
4 files changed, 34 insertions(+), 14 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 73b8e5b..344ade4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -149,11 +149,12 @@ public class PinotTableIdealStateBuilder {
* @param partitionGroupConsumptionStatusList List of {@link
PartitionGroupConsumptionStatus} for the current partition groups.
* The size of this list is equal
to the number of partition groups,
* and is created using the latest
segment zk metadata.
+ * @param reason the reason to get partition group metadata
*/
public static List<PartitionGroupMetadata>
getPartitionGroupMetadataList(StreamConfig streamConfig,
- List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList) {
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, String reason) {
PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
- new PartitionGroupMetadataFetcher(streamConfig,
partitionGroupConsumptionStatusList);
+ new PartitionGroupMetadataFetcher(streamConfig,
partitionGroupConsumptionStatusList, reason);
try {
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2e1dcc3..b8eca7c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -75,6 +75,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -257,7 +258,8 @@ public class PinotLLCRealtimeSegmentManager {
PartitionLevelStreamConfig streamConfig =
new PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
Collections.emptyList(),
PartitionGroupMetadataFetcher.Reason.TABLE_CREATION.name());
int numPartitionGroups = newPartitionGroupMetadataList.size();
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
@@ -502,7 +504,8 @@ public class PinotLLCRealtimeSegmentManager {
// Fetches new partition groups, given current list of {@link
PartitionGroupConsumptionStatus}.
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList,
+ PartitionGroupMetadataFetcher.Reason.SEGMENT_COMMITMENT.name() +
"-" + committingSegmentPartitionGroupId);
Set<Integer> newPartitionGroupSet =
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
int numPartitionGroups = newPartitionGroupMetadataList.size();
@@ -705,9 +708,9 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig
streamConfig,
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, String reason) {
return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
- currentPartitionGroupConsumptionStatusList);
+ currentPartitionGroupConsumptionStatusList, reason);
}
/**
@@ -813,7 +816,8 @@ public class PinotLLCRealtimeSegmentManager {
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfig);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList,
+
PartitionGroupMetadataFetcher.Reason.PERIODIC_SEGMENT_VALIDATION.name());
return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, newPartitionGroupMetadataList);
} else {
@@ -1134,7 +1138,8 @@ public class PinotLLCRealtimeSegmentManager {
StreamConfig smallestOffsetCriteriaStreamConfig =
new StreamConfig(streamConfig.getTableNameWithType(),
streamConfigMapWithSmallestOffsetCriteria);
List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
- getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig,
Collections.emptyList());
+ getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig,
Collections.emptyList(),
+
PartitionGroupMetadataFetcher.Reason.PERIODIC_PARTITION_GROUP_SMALLEST_OFFSET_FETCHER.name()
+ "-" + partitionGroupId);
StreamPartitionMsgOffset partitionStartOffset = null;
for (PartitionGroupMetadata info :
smallestOffsetCriteriaPartitionGroupMetadata) {
if (info.getPartitionGroupId() == partitionGroupId) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index c3d58da..3db14e7 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -254,7 +254,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// committing segment's partitionGroupId no longer in the
newPartitionGroupMetadataList
List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig,
Collections.emptyList());
+
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig,
Collections.emptyList(), "TEST_TABLE_CREATION");
partitionGroupMetadataListWithout0.remove(0);
segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout0;
@@ -565,7 +565,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
*/
// 1 reached end of shard.
List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig,
Collections.emptyList());
+
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig,
Collections.emptyList(),
+ "TEST_TABLE_CREATION");
partitionGroupMetadataListWithout1.remove(1);
segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout1;
// noop
@@ -962,7 +963,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
- getNewPartitionGroupMetadataList(_streamConfig,
Collections.emptyList()));
+ getNewPartitionGroupMetadataList(_streamConfig,
Collections.emptyList(), "TEST_PERIODIC_SEGMENT_VALIDATION"));
}
@Override
@@ -1029,7 +1030,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Override
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig
streamConfig,
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, String reason) {
if (_partitionGroupMetadataList != null) {
return _partitionGroupMetadataList;
} else {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 6cc74ce..8439462 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
import java.util.List;
import java.util.concurrent.Callable;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,18 +33,27 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
+ public enum Reason {
+ TABLE_CREATION, SEGMENT_COMMITMENT, PERIODIC_SEGMENT_VALIDATION,
PERIODIC_PARTITION_GROUP_SMALLEST_OFFSET_FETCHER
+ }
+
private List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
private final StreamConfig _streamConfig;
private final List<PartitionGroupConsumptionStatus>
_partitionGroupConsumptionStatusList;
private final StreamConsumerFactory _streamConsumerFactory;
private Exception _exception;
private final String _topicName;
+ private final String _reason;
- public PartitionGroupMetadataFetcher(StreamConfig streamConfig,
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) {
+ public PartitionGroupMetadataFetcher(
+ StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList,
+ String reason) {
_streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
_topicName = streamConfig.getTopicName();
_streamConfig = streamConfig;
_partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
+ _reason = reason;
}
public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
@@ -61,7 +71,10 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
@Override
public Boolean call()
throws Exception {
- String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() +
"-" + _topicName;
+ String clientId = PartitionGroupMetadataFetcher.class.getSimpleName()
+ + "-" + _topicName
+ + "-" +
TableNameBuilder.extractRawTableName(_streamConfig.getTableNameWithType())
+ + "-" + _reason;
try (
StreamMetadataProvider streamMetadataProvider =
_streamConsumerFactory.createStreamMetadataProvider(clientId)) {
_newPartitionGroupMetadataList = streamMetadataProvider
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]