This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c9e08f367a Updates forceCommit APIs to handle Pauseless (#14828) c9e08f367a is described below commit c9e08f367ab1e89e7e54bdc11a70989fdc5f8913 Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Fri Jan 24 05:59:46 2025 +0530 Updates forceCommit APIs to handle Pauseless (#14828) --- .../api/resources/PinotRealtimeTableResource.java | 28 ++++++++++++------- .../realtime/PinotLLCRealtimeSegmentManager.java | 16 +++++++++++ .../PinotLLCRealtimeSegmentManagerTest.java | 31 ++++++++++++++++++++++ .../tests/LLCRealtimeClusterIntegrationTest.java | 25 +++++++++++++++-- .../LLCRealtimeKafka3ClusterIntegrationTest.java | 25 +++++++++++++++-- .../apache/pinot/spi/utils/CommonConstants.java | 1 + 6 files changed, 113 insertions(+), 13 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index 2ab15427f7..f4a0e633a0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -29,7 +29,6 @@ import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -222,15 +221,26 @@ public class PinotRealtimeTableResource { String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); Set<String> consumingSegmentCommitted = JsonUtils.stringToObject( controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), Set.class); - Set<String> onlineSegmentsForTable = - _pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false); - Set<String> segmentsYetToBeCommitted = new HashSet<>(); - consumingSegmentCommitted.forEach(segmentName -> { - if (!onlineSegmentsForTable.contains(segmentName)) { - segmentsYetToBeCommitted.add(segmentName); - } - }); + Set<String> segmentsToCheck; + String segmentsPendingToBeComittedString = + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST); + + if (segmentsPendingToBeComittedString != null) { + segmentsToCheck = JsonUtils.stringToObject(segmentsPendingToBeComittedString, Set.class); + } else { + segmentsToCheck = consumingSegmentCommitted; + } + + Set<String> segmentsYetToBeCommitted = + _pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, segmentsToCheck); + + if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) { + controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, + JsonUtils.objectToString(segmentsYetToBeCommitted)); + _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, controllerJobZKMetadata, + ControllerJobType.FORCE_COMMIT, prev -> true); + } Map<String, Object> result = new HashMap<>(controllerJobZKMetadata); result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 3ed88967c6..2b9cf8f954 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -2009,4 +2009,20 @@ public class PinotLLCRealtimeSegmentManager { URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + + public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, Set<String> segmentsToCheck) { + Set<String> segmentsYetToBeCommitted = new HashSet<>(); + for (String segmentName: segmentsToCheck) { + SegmentZKMetadata segmentZKMetadata = + _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); + if (segmentZKMetadata == null) { + // Segment is deleted. No need to track this segment among segments yetToBeCommitted. + continue; + } + if (!(segmentZKMetadata.getStatus().equals(Status.DONE))) { + segmentsYetToBeCommitted.add(segmentName); + } + } + return segmentsYetToBeCommitted; + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index dbe640d364..d5969e611f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.helix.core.realtime; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; @@ -1247,6 +1248,36 @@ public class PinotLLCRealtimeSegmentManagerTest { Assert.assertEquals(partitionIds.size(), 2); } + @Test + public void getSegmentsYetToBeCommitted() { + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager realtimeSegmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + + SegmentZKMetadata mockSegmentZKMetadataDone = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataDone.getStatus()).thenReturn(Status.DONE); + + SegmentZKMetadata mockSegmentZKMetadataUploaded = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataUploaded.getStatus()).thenReturn(Status.UPLOADED); + + SegmentZKMetadata mockSegmentZKMetadataInProgress = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS); + + SegmentZKMetadata mockSegmentZKMetadataInCommitting = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataInCommitting.getStatus()).thenReturn(Status.COMMITTING); + + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s0")).thenReturn(mockSegmentZKMetadataDone); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s3")).thenReturn(mockSegmentZKMetadataDone); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s2")).thenReturn(mockSegmentZKMetadataUploaded); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s4")).thenReturn(mockSegmentZKMetadataInProgress); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s1")).thenReturn(null); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s5")).thenReturn(mockSegmentZKMetadataInCommitting); + + Set<String> segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", "s4", "s5"); + Set<String> segmentsYetToBeCommitted = realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck); + assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted); + } + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 78b34fc563..1765550641 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -41,6 +41,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.HashUtil; @@ -427,12 +428,18 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr throws Exception { Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); String jobId = forceCommit(getTableName()); + Map<String, String> jobMetadata = + _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT); + assert jobMetadata != null; + assert jobMetadata.get("segmentsForceCommitted") != null; TestUtils.waitForCondition(aVoid -> { try { if (isForceCommitJobCompleted(jobId)) { - assertTrue(_controllerStarter.getHelixResourceManager() - .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegments)); + for (String segmentName : consumingSegments) { + assertEquals(CommonConstants.Segment.Realtime.Status.DONE, _controllerStarter.getHelixResourceManager() + .getSegmentZKMetadata(getTableName() + "_REALTIME", segmentName).getStatus()); + } return true; } return false; @@ -462,6 +469,20 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId); assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT"); + + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null; + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST) != null; + + Set<String> allSegments = JsonUtils.stringToObject( + jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(), HashSet.class); + Set<String> segmentsPending = new HashSet<>(); + for (JsonNode element : jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)) { + segmentsPending.add(element.asText()); + } + + assert segmentsPending.size() <= allSegments.size(); + assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size(); + return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java index dce404d64d..e61cb07c69 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java @@ -41,6 +41,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.HashUtil; @@ -395,12 +396,18 @@ public class LLCRealtimeKafka3ClusterIntegrationTest extends BaseRealtimeCluster throws Exception { Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); String jobId = forceCommit(getTableName()); + Map<String, String> jobMetadata = + _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT); + assert jobMetadata != null; + assert jobMetadata.get("segmentsForceCommitted") != null; TestUtils.waitForCondition(aVoid -> { try { if (isForceCommitJobCompleted(jobId)) { - assertTrue(_controllerStarter.getHelixResourceManager() - .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegments)); + for (String segmentName : consumingSegments) { + assertEquals(CommonConstants.Segment.Realtime.Status.DONE, _controllerStarter.getHelixResourceManager() + .getSegmentZKMetadata(getTableName() + "_REALTIME", segmentName).getStatus()); + } return true; } return false; @@ -430,6 +437,20 @@ public class LLCRealtimeKafka3ClusterIntegrationTest extends BaseRealtimeCluster assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId); assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT"); + + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null; + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST) != null; + + Set<String> allSegments = JsonUtils.stringToObject( + jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(), HashSet.class); + Set<String> segmentsPending = new HashSet<>(); + for (JsonNode element : jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)) { + segmentsPending.add(element.asText()); + } + + assert segmentsPending.size() <= allSegments.size(); + assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size(); + return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 30f4b44e27..ecb26b5dfd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1039,6 +1039,7 @@ public class CommonConstants { public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = "instanceName"; // Force commit job ZK props public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted"; + public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = "segmentsYetToBeCommitted"; } // prefix for scheduler related features, e.g. query accountant --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org