This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c44fd796a5b [Auto reset 2/3]Introduce topic 'inactive' status (#16692)
c44fd796a5b is described below
commit c44fd796a5b1ec3a509c791fa3340af794c9d43f
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Aug 29 15:48:37 2025 -0700
[Auto reset 2/3]Introduce topic 'inactive' status (#16692)
* Introduce topic 'inactive' status
* Expose the topic pause through API and add UT for
PartitionGroupMetadataFetcher
* Fix style
* Fix idealstate update
* Change variable naming
---
.../api/resources/PinotRealtimeTableResource.java | 68 +++++++
.../controller/helix/SegmentStatusChecker.java | 2 +-
.../helix/core/PinotTableIdealStateBuilder.java | 6 +-
.../realtime/MissingConsumingSegmentFinder.java | 8 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 126 ++++++++++--
.../PinotLLCRealtimeSegmentManagerTest.java | 25 +--
.../apache/pinot/spi/config/table/PauseState.java | 16 +-
.../spi/stream/PartitionGroupMetadataFetcher.java | 10 +-
.../stream/PartitionGroupMetadataFetcherTest.java | 216 +++++++++++++++++++++
9 files changed, 438 insertions(+), 39 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index b442b883d99..bd262a9f995 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -29,12 +29,15 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -123,6 +126,38 @@ public class PinotRealtimeTableResource {
}
}
+ @POST
+ @Path("/tables/{tableName}/pauseTopicConsumption")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.PAUSE_CONSUMPTION)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Pause consumption of some topics of a realtime
table", notes = "Pause the consumption of "
+ + "some topics of a realtime table.")
+ public Response pauseTopicConsumption(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "Comma separated list of index of the topics",
required = true) @QueryParam("topicIndices")
+ String topicIndices,
+ @Context HttpHeaders headers) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validateTable(tableNameWithType);
+ List<Integer> topicIndexList;
+ try {
+ topicIndexList = Arrays.stream(topicIndices.split(","))
+ .map(String::trim)
+ .map(idx -> Integer.parseInt(idx))
+ .collect(Collectors.toList());
+ } catch (NumberFormatException nfe) {
+ throw new ControllerApplicationException(LOGGER, "topicIndices should be
a comma separated list of integers",
+ Response.Status.BAD_REQUEST, nfe);
+ }
+ try {
+ return
Response.ok(_pinotLLCRealtimeSegmentManager.pauseTopicsConsumption(tableNameWithType,
topicIndexList))
+ .build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@POST
@Path("/tables/{tableName}/resumeConsumption")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.RESUME_CONSUMPTION)
@@ -160,6 +195,39 @@ public class PinotRealtimeTableResource {
}
}
+ @POST
+ @Path("/tables/{tableName}/resumeTopicConsumption")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.RESUME_CONSUMPTION)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Resume consumption of some topics of a realtime
table", notes =
+ "Resume the consumption for some topics of a realtime table. There are
two independent pause mechanism, "
+ + "table pause and topic pause. The topics is resumed only if both
table and topics are resumed.")
+ public Response resumeTopicConsumption(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "Comma separated list of index of the topics",
required = true) @QueryParam("topicIndices")
+ String topicIndices,
+ @Context HttpHeaders headers) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validateTable(tableNameWithType);
+ List<Integer> topicIndexList;
+ try {
+ topicIndexList = Arrays.stream(topicIndices.split(","))
+ .map(String::trim)
+ .map(idx -> Integer.parseInt(idx))
+ .collect(Collectors.toList());
+ } catch (NumberFormatException nfe) {
+ throw new ControllerApplicationException(LOGGER, "topicIndices should be
a comma separated list of integers",
+ Response.Status.BAD_REQUEST, nfe);
+ }
+ try {
+ return
Response.ok(_pinotLLCRealtimeSegmentManager.resumeTopicsConsumption(
+ tableNameWithType, topicIndexList)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@POST
@Path("/tables/{tableName}/forceCommit")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.FORCE_COMMIT)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 553dada84f8..9a09761b815 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -479,7 +479,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
if (tableType == TableType.REALTIME && tableConfig != null) {
List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
new MissingConsumingSegmentFinder(tableNameWithType, propertyStore,
_controllerMetrics,
- streamConfigs).findAndEmitMetrics(idealState);
+ streamConfigs, idealState).findAndEmitMetrics(idealState);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 8bc9ea442fb..6ec48830ca7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -87,12 +87,14 @@ public class PinotTableIdealStateBuilder {
* partition groups.
* The size of this list is equal
to the number of partition groups,
* and is created using the latest
segment zk metadata.
+ * @param pausedTopicIndices List of inactive topic indices. Index is the
index of the topic in the streamConfigMaps.
* @param forceGetOffsetFromStream - details in
PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
*/
public static List<PartitionGroupMetadata>
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
+ boolean forceGetOffsetFromStream) {
PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new
PartitionGroupMetadataFetcher(
- streamConfigs, partitionGroupConsumptionStatusList,
forceGetOffsetFromStream);
+ streamConfigs, partitionGroupConsumptionStatusList,
pausedTopicIndices, forceGetOffsetFromStream);
try {
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index efc43246b7e..99bee6f8a7f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
+import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -67,7 +69,7 @@ public class MissingConsumingSegmentFinder {
private ControllerMetrics _controllerMetrics;
public MissingConsumingSegmentFinder(String realtimeTableName,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
+ ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs,
IdealState idealState) {
_realtimeTableName = realtimeTableName;
_controllerMetrics = controllerMetrics;
_segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore,
controllerMetrics);
@@ -81,7 +83,9 @@ public class MissingConsumingSegmentFinder {
return streamConfig;
});
try {
- PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(), false)
+ PauseState pauseState =
PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState);
+ PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(),
+ pauseState == null ? new ArrayList<>() :
pauseState.getIndexOfInactiveTopics(), false)
.forEach(metadata -> {
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
});
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 0d135dd37e3..549cd2c2507 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -377,7 +378,7 @@ public class PinotLLCRealtimeSegmentManager {
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList());
+ getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(), idealState);
int numPartitionGroups = newPartitionGroupMetadataList.size();
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
@@ -781,7 +782,7 @@ public class PinotLLCRealtimeSegmentManager {
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
String newConsumingSegmentName = null;
- if (!isTablePaused(idealState)) {
+ if (!isTablePaused(idealState) && !isTopicPaused(idealState,
committingSegmentName)) {
LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
@@ -932,8 +933,7 @@ public class PinotLLCRealtimeSegmentManager {
// Handle offset auto reset
String nextOffset = committingSegmentDescriptor.getNextOffset();
- String startOffset = computeStartOffset(
- nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ String startOffset = computeStartOffset(nextOffset, streamConfig,
newLLCSegmentName.getPartitionGroupId());
LOGGER.info(
"Creating segment ZK metadata for new CONSUMING segment: {} with start
offset: {} and creation time: {}",
@@ -972,8 +972,9 @@ public class PinotLLCRealtimeSegmentManager {
int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
if (timeThreshold <= 0 && offsetThreshold <= 0) {
LOGGER.warn("Invalid offset auto reset configuration for table: {},
topic: {}. "
- + "timeThreshold: {}, offsetThreshold: {}",
- streamConfig.getTableNameWithType(), streamConfig.getTopicName(),
timeThreshold, offsetThreshold);
+ + "timeThreshold: {}, offsetThreshold: {}",
streamConfig.getTableNameWithType(),
+ streamConfig.getTopicName(),
+ timeThreshold, offsetThreshold);
return nextOffset;
}
String clientId = getTableTopicUniqueClientId(streamConfig);
@@ -990,8 +991,8 @@ public class PinotLLCRealtimeSegmentManager {
// (CurrentTime - SLA)'s offset > nextOffset.
// TODO: it is relying on System.currentTimeMillis() which might be
affected by time drift. If we are able to
// get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
- latestOffset = metadataProvider.fetchStreamPartitionOffset(
- OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+ latestOffset =
+
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
STREAM_FETCH_TIMEOUT_MS);
LOGGER.info("Latest offset of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
latestOffset);
if (timeThreshold > 0) {
@@ -1140,7 +1141,7 @@ public class PinotLLCRealtimeSegmentManager {
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
+ getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
partitionIds.addAll(newPartitionGroupMetadataList.stream()
.map(PartitionGroupMetadata::getPartitionGroupId)
.collect(Collectors.toSet()));
@@ -1155,9 +1156,9 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
- return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
- currentPartitionGroupConsumptionStatusList, false);
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
+ return getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState,
+ false);
}
/**
@@ -1167,10 +1168,12 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList,
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean forceGetOffsetFromStream) {
+ PauseState pauseState = extractTablePauseState(idealState);
return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
- currentPartitionGroupConsumptionStatusList, forceGetOffsetFromStream);
+ currentPartitionGroupConsumptionStatusList,
+ pauseState == null ? new ArrayList<>() :
pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream);
}
/**
@@ -1343,7 +1346,7 @@ public class PinotLLCRealtimeSegmentManager {
.forEach(streamConfig -> streamConfig.setOffsetCriteria(
offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
+ getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
streamConfigs.stream().forEach(streamConfig ->
streamConfig.setOffsetCriteria(originalOffsetCriteria));
return ensureAllPartitionsConsuming(tableConfig, streamConfigs,
idealState, newPartitionGroupMetadataList,
offsetCriteria);
@@ -1383,7 +1386,8 @@ public class PinotLLCRealtimeSegmentManager {
"Exceeded max segment completion time for segment " +
committingSegmentName);
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
- isTablePaused(idealState) ? null : newSegmentName,
segmentAssignment, instancePartitionsMap);
+ isTablePaused(idealState) || isTopicPaused(idealState,
committingSegmentName) ? null : newSegmentName,
+ segmentAssignment, instancePartitionsMap);
return idealState;
}, DEFAULT_RETRY_POLICY);
}
@@ -1399,7 +1403,24 @@ public class PinotLLCRealtimeSegmentManager {
return
Boolean.parseBoolean(idealState.getRecord().getSimpleField(IS_TABLE_PAUSED));
}
- private static PauseState extractTablePauseState(IdealState idealState) {
+ public static boolean isTopicPaused(IdealState idealState, int topicIndex) {
+ PauseState pauseState = extractTablePauseState(idealState);
+ if (pauseState != null) {
+ return pauseState.getIndexOfInactiveTopics().contains(topicIndex);
+ }
+ return false;
+ }
+
+ public static boolean isTopicPaused(IdealState idealState, String
segmentName) {
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName != null) {
+ return isTopicPaused(idealState,
+
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId()));
+ }
+ return false;
+ }
+
+ public static PauseState extractTablePauseState(IdealState idealState) {
String pauseStateStr =
idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE);
try {
if (pauseStateStr != null) {
@@ -1800,8 +1821,8 @@ public class PinotLLCRealtimeSegmentManager {
// Temporarily, we are passing a boolean flag to indicate if we want to
use the current status
// The kafka implementation of computePartitionGroupMetadata() will
ignore the current status
// while the kinesis implementation will use it.
- List<PartitionGroupMetadata> partitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, true);
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
getNewPartitionGroupMetadataList(
+ streamConfigs, currentPartitionGroupConsumptionStatusList,
idealState, true);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
@@ -2406,8 +2427,12 @@ public class PinotLLCRealtimeSegmentManager {
public IdealState updatePauseStateInIdealState(String tableNameWithType,
boolean pause,
PauseState.ReasonCode reasonCode, @Nullable String comment) {
PauseState pauseState =
- new PauseState(pause, reasonCode, comment, new
Timestamp(System.currentTimeMillis()).toString());
+ new PauseState(pause, reasonCode, comment, new
Timestamp(System.currentTimeMillis()).toString(), null);
IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager,
tableNameWithType, idealState -> {
+ PauseState previousPauseState = extractTablePauseState(idealState);
+ if (previousPauseState != null) {
+
pauseState.setIndexOfInactiveTopics(previousPauseState.getIndexOfInactiveTopics());
+ }
ZNRecord znRecord = idealState.getRecord();
znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
// maintain for backward compatibility
@@ -2419,6 +2444,48 @@ public class PinotLLCRealtimeSegmentManager {
return updatedIdealState;
}
+ public PauseState pauseTopicsConsumption(String tableNameWithType,
List<Integer> indexOfPausedTopics) {
+ IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager,
tableNameWithType, idealState -> {
+ PauseState pauseState = extractTablePauseState(idealState);
+ if (pauseState == null) {
+ pauseState = new PauseState(false,
PauseState.ReasonCode.ADMINISTRATIVE, null,
+ new Timestamp(System.currentTimeMillis()).toString(),
indexOfPausedTopics);
+ } else {
+ // Union the existing paused topics with the newly paused topics
+ pauseState.setIndexOfInactiveTopics(
+ Stream.concat(pauseState.getIndexOfInactiveTopics().stream(),
indexOfPausedTopics.stream())
+ .distinct()
+ .collect(Collectors.toList()));
+ }
+ ZNRecord znRecord = idealState.getRecord();
+ znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
+ LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. " +
"to pause topics with indices {}.",
+ pauseState, tableNameWithType, indexOfPausedTopics);
+ return new IdealState(znRecord);
+ }, RetryPolicies.noDelayRetryPolicy(3));
+ Set<String> consumingSegments =
findConsumingSegmentsOfTopics(updatedIdealState, indexOfPausedTopics);
+ sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+ return extractTablePauseState(updatedIdealState);
+ }
+
+ public PauseState resumeTopicsConsumption(String tableNameWithType,
List<Integer> indexOfPausedTopics) {
+ IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager,
tableNameWithType, idealState -> {
+ PauseState pauseState = extractTablePauseState(idealState);
+ if (pauseState == null) {
+ return idealState;
+ }
+ pauseState.getIndexOfInactiveTopics().removeAll(indexOfPausedTopics);
+ ZNRecord znRecord = idealState.getRecord();
+ znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
+ LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. " +
"to resume topics with indices {}.",
+ pauseState, tableNameWithType, indexOfPausedTopics);
+ return new IdealState(znRecord);
+ }, RetryPolicies.noDelayRetryPolicy(3));
+ _helixResourceManager.invokeControllerPeriodicTask(tableNameWithType,
Constants.REALTIME_SEGMENT_VALIDATION_MANAGER,
+ new HashMap<>());
+ return extractTablePauseState(updatedIdealState);
+ }
+
private void sendForceCommitMessageToServers(String tableNameWithType,
Set<String> consumingSegments) {
if (!consumingSegments.isEmpty()) {
LOGGER.info("Sending force commit messages for segments: {} of table:
{}", consumingSegments, tableNameWithType);
@@ -2446,6 +2513,24 @@ public class PinotLLCRealtimeSegmentManager {
return consumingSegments;
}
+ private Set<String> findConsumingSegmentsOfTopics(IdealState idealState,
List<Integer> topicIndices) {
+ Set<String> consumingSegments = new TreeSet<>();
+ idealState.getRecord().getMapFields().forEach((segmentName,
instanceToStateMap) -> {
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName != null && topicIndices.contains(
+
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId())))
{
+ return;
+ }
+ for (String state : instanceToStateMap.values()) {
+ if (state.equals(SegmentStateModel.CONSUMING)) {
+ consumingSegments.add(segmentName);
+ break;
+ }
+ }
+ });
+ return consumingSegments;
+ }
+
/**
* Return pause status:
* - Information from the 'pauseState' in the table ideal state
@@ -2456,6 +2541,7 @@ public class PinotLLCRealtimeSegmentManager {
Set<String> consumingSegments = findConsumingSegments(idealState);
PauseState pauseState = extractTablePauseState(idealState);
if (pauseState != null) {
+ // TODO: add paused topics information
return new PauseStatusDetails(pauseState.isPaused(), consumingSegments,
pauseState.getReasonCode(),
pauseState.getComment(), pauseState.getTimeInMillis());
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 263484c5d9d..a04fd27ab9c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -131,6 +131,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 +
NUM_DOCS;
static final int SEGMENT_SIZE_IN_BYTES = 100000000;
+
@AfterClass
public void tearDown()
throws IOException {
@@ -289,7 +290,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// committing segment's partitionGroupId no longer in the
newPartitionGroupMetadataList
List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs,
Collections.emptyList());
+
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs,
Collections.emptyList(),
+ mock(IdealState.class));
partitionGroupMetadataListWithout0.remove(0);
segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout0;
@@ -743,7 +745,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
*/
// 1 reached end of shard.
List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs,
Collections.emptyList());
+
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs,
Collections.emptyList(),
+ mock(IdealState.class));
partitionGroupMetadataListWithout1.remove(1);
segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout1;
// noop
@@ -1463,7 +1466,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(4), null).getDownloadUrl());
}
-
@Test
public void testDeleteTmpSegmentFiles()
throws Exception {
@@ -1529,7 +1531,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
new PartitionGroupMetadata(1, new LongMsgOffset(345)));
doReturn(partitionGroupMetadataList).when(segmentManagerSpy)
- .getNewPartitionGroupMetadataList(streamConfigs,
partitionGroupConsumptionStatusList);
+ .getNewPartitionGroupMetadataList(streamConfigs,
partitionGroupConsumptionStatusList, idealState);
partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs,
idealState);
Assert.assertEquals(partitionIds.size(), 2);
}
@@ -1744,7 +1746,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Test
- public void testSyncCommittingSegments() throws Exception {
+ public void testSyncCommittingSegments()
+ throws Exception {
// Set up mocks for the resource management infrastructure
PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
HelixManager helixManager = mock(HelixManager.class);
@@ -1767,7 +1770,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
String committingSegmentsListPath =
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
-
// Create test segments with different states
String committingSegment1 = "testTable__0__0__20250210T1142Z";
String committingSegment2 = "testTable__0__1__20250210T1142Z";
@@ -1814,16 +1816,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertEquals(new
HashSet<>(existingRecord.getListField(COMMITTING_SEGMENTS)),
new HashSet<>(List.of(committingSegment1, committingSegment2)));
-
// Test 3: Error handling during ZooKeeper operations
when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(),
anyInt(), eq(AccessOption.PERSISTENT)))
.thenThrow(new RuntimeException("ZooKeeper operation failed"));
assertFalse(segmentManager.syncCommittingSegments(realtimeTableName,
newSegments));
}
-
//////////////////////////////////////////////////////////////////////////////////
// Fake classes
+
/////////////////////////////////////////////////////////////////////////////////
private static class FakePinotLLCRealtimeSegmentManager extends
PinotLLCRealtimeSegmentManager {
@@ -1889,7 +1890,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState,
- getNewPartitionGroupMetadataList(_streamConfigs,
Collections.emptyList()), null);
+ getNewPartitionGroupMetadataList(_streamConfigs,
Collections.emptyList(), mock(IdealState.class)), null);
}
@Override
@@ -1971,7 +1972,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Override
List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
if (_partitionGroupMetadataList != null) {
return _partitionGroupMetadataList;
} else {
@@ -1982,9 +1983,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Override
List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList,
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean forceGetOffsetFromStream) {
- return getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
+ return getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
}
@Override
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
index 4781bfbdac7..50ae0404854 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.spi.config.table;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.pinot.spi.config.BaseJsonConfig;
@@ -26,15 +28,19 @@ public class PauseState extends BaseJsonConfig {
private ReasonCode _reasonCode;
private String _comment;
private String _timestamp;
+ // List of inactive topic indices. Index is the index of the topic in the
streamConfigMaps.
+ private List<Integer> _indexOfInactiveTopics;
public PauseState() {
}
- public PauseState(boolean paused, ReasonCode reasonCode, String comment,
String timestamp) {
+ public PauseState(boolean paused, ReasonCode reasonCode, String comment,
String timestamp,
+ List<Integer> indexOfInactiveTopics) {
_paused = paused;
_reasonCode = reasonCode;
_comment = comment;
_timestamp = timestamp;
+ setIndexOfInactiveTopics(indexOfInactiveTopics);
}
public boolean isPaused() {
@@ -53,6 +59,10 @@ public class PauseState extends BaseJsonConfig {
return _timestamp;
}
+ public List<Integer> getIndexOfInactiveTopics() {
+ return _indexOfInactiveTopics;
+ }
+
public void setPaused(boolean paused) {
_paused = paused;
}
@@ -69,6 +79,10 @@ public class PauseState extends BaseJsonConfig {
_timestamp = timestamp;
}
+ public void setIndexOfInactiveTopics(List<Integer> indexOfInactiveTopics) {
+ _indexOfInactiveTopics = indexOfInactiveTopics == null ? new ArrayList<>()
: indexOfInactiveTopics;
+ }
+
public enum ReasonCode {
ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED, RESOURCE_UTILIZATION_LIMIT_EXCEEDED
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index bf05ea02854..698ad472e1a 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -38,14 +38,17 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
private final List<PartitionGroupConsumptionStatus>
_partitionGroupConsumptionStatusList;
private final boolean _forceGetOffsetFromStream;
private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList =
new ArrayList<>();
+ private final List<Integer> _pausedTopicIndices;
private Exception _exception;
public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
+ boolean forceGetOffsetFromStream) {
_streamConfigs = streamConfigs;
_partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
_forceGetOffsetFromStream = forceGetOffsetFromStream;
+ _pausedTopicIndices = pausedTopicIndices;
}
public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
@@ -100,6 +103,11 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
throws Exception {
int numStreams = _streamConfigs.size();
for (int i = 0; i < numStreams; i++) {
+ if (_pausedTopicIndices.contains(i)) {
+ LOGGER.info("Skipping fetching PartitionGroupMetadata for paused
topic: {}",
+ _streamConfigs.get(i).getTopicName());
+ continue;
+ }
StreamConfig streamConfig = _streamConfigs.get(i);
String topicName = streamConfig.getTopicName();
String clientId =
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
new file mode 100644
index 00000000000..9fa65254b63
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class PartitionGroupMetadataFetcherTest {
+
+ @Test
+ public void testFetchSingleStreamSuccess()
+ throws Exception {
+ // Setup
+ StreamConfig streamConfig = createMockStreamConfig("test-topic",
"test-table", false);
+ List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+ PartitionGroupConsumptionStatus status =
mock(PartitionGroupConsumptionStatus.class);
+ when(status.getPartitionGroupId()).thenReturn(0);
+ List<PartitionGroupConsumptionStatus> statusList =
Collections.singletonList(status);
+
+ PartitionGroupMetadata metadata = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+ List<PartitionGroupMetadata> metadataList =
Collections.singletonList(metadata);
+
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean())).thenReturn(metadataList);
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, statusList, Collections.emptyList(), false);
+
+ // Execute
+ Boolean result = fetcher.call();
+
+ // Verify
+ Assert.assertTrue(result);
+ Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 1);
+ Assert.assertNull(fetcher.getException());
+ }
+ }
+
+ @Test
+ public void testFetchSingleStreamTransientException()
+ throws Exception {
+ // Setup
+ StreamConfig streamConfig = createMockStreamConfig("test-topic",
"test-table", false);
+ List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+ List<PartitionGroupConsumptionStatus> statusList = Collections.emptyList();
+
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenThrow(new TransientConsumerException(new
RuntimeException("Transient error")));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, statusList, Collections.emptyList(), false);
+
+ // Execute
+ Boolean result = fetcher.call();
+
+ // Verify
+ Assert.assertFalse(result);
+ Assert.assertTrue(fetcher.getException() instanceof
TransientConsumerException);
+ }
+ }
+
+ @Test
+ public void testFetchMultipleStreams()
+ throws Exception {
+ // Setup
+ StreamConfig streamConfig1 = createMockStreamConfig("topic1",
"test-table", false);
+ StreamConfig streamConfig2 = createMockStreamConfig("topic2",
"test-table", false);
+ List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1,
streamConfig2);
+
+ PartitionGroupConsumptionStatus status1 = new
PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS");
+ PartitionGroupConsumptionStatus status2 = new
PartitionGroupConsumptionStatus(1, 1, null, null, "IN_PROGRESS");
+ List<PartitionGroupConsumptionStatus> statusList = Arrays.asList(status1,
status2);
+
+ PartitionGroupMetadata mockedMetadata1 = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+ PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1,
mock(StreamPartitionMsgOffset.class));
+
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, statusList, Collections.emptyList(), false);
+
+ // Execute
+ Boolean result = fetcher.call();
+
+ // Verify
+ Assert.assertTrue(result);
+ Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+ Assert.assertNull(fetcher.getException());
+
+ // Verify the correct partition group IDs: 0, 1, 10000, 10001
+ List<PartitionGroupMetadata> resultMetadata =
fetcher.getPartitionGroupMetadataList();
+ List<Integer> partitionIds = resultMetadata.stream()
+ .map(PartitionGroupMetadata::getPartitionGroupId)
+ .sorted()
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(partitionIds, Arrays.asList(0, 1, 10000, 10001));
+ }
+ }
+
+ @Test
+ public void testFetchMultipleStreamsWithPause()
+ throws Exception {
+ // Setup
+ StreamConfig streamConfig1 = createMockStreamConfig("topic1",
"test-table", false);
+ StreamConfig streamConfig2 = createMockStreamConfig("topic2",
"test-table", false);
+ StreamConfig streamConfig3 = createMockStreamConfig("topic3",
"test-table", false);
+ List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1,
streamConfig2, streamConfig3);
+
+ PartitionGroupConsumptionStatus status1 = new
PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS");
+ PartitionGroupConsumptionStatus status2 = new
PartitionGroupConsumptionStatus(1, 1, null, null, "IN_PROGRESS");
+ List<PartitionGroupConsumptionStatus> statusList = Arrays.asList(status1,
status2);
+
+ PartitionGroupMetadata mockedMetadata1 = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+ PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1,
mock(StreamPartitionMsgOffset.class));
+
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, statusList, Arrays.asList(1), false);
+
+ // Execute
+ Boolean result = fetcher.call();
+
+ // Verify
+ Assert.assertTrue(result);
+ Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+ Assert.assertNull(fetcher.getException());
+
+ // Verify the correct partition group IDs
+ List<PartitionGroupMetadata> resultMetadata =
fetcher.getPartitionGroupMetadataList();
+ List<Integer> partitionIds = resultMetadata.stream()
+ .map(PartitionGroupMetadata::getPartitionGroupId)
+ .sorted()
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(partitionIds, Arrays.asList(0, 1, 20000, 20001));
+ }
+ }
+
+ private StreamConfig createMockStreamConfig(String topicName, String
tableName, boolean isEphemeral) {
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ when(streamConfig.getTopicName()).thenReturn(topicName);
+ when(streamConfig.getTableNameWithType()).thenReturn(tableName);
+ return streamConfig;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]