This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c9e08f367a Updates forceCommit APIs to handle Pauseless (#14828)
c9e08f367a is described below

commit c9e08f367ab1e89e7e54bdc11a70989fdc5f8913
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Fri Jan 24 05:59:46 2025 +0530

    Updates forceCommit APIs to handle Pauseless (#14828)
---
 .../api/resources/PinotRealtimeTableResource.java  | 28 ++++++++++++-------
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 16 +++++++++++
 .../PinotLLCRealtimeSegmentManagerTest.java        | 31 ++++++++++++++++++++++
 .../tests/LLCRealtimeClusterIntegrationTest.java   | 25 +++++++++++++++--
 .../LLCRealtimeKafka3ClusterIntegrationTest.java   | 25 +++++++++++++++--
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 6 files changed, 113 insertions(+), 13 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 2ab15427f7..f4a0e633a0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -29,7 +29,6 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -222,15 +221,26 @@ public class PinotRealtimeTableResource {
     String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
     Set<String> consumingSegmentCommitted = JsonUtils.stringToObject(
         
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
 Set.class);
-    Set<String> onlineSegmentsForTable =
-        
_pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, 
false);
 
-    Set<String> segmentsYetToBeCommitted = new HashSet<>();
-    consumingSegmentCommitted.forEach(segmentName -> {
-      if (!onlineSegmentsForTable.contains(segmentName)) {
-        segmentsYetToBeCommitted.add(segmentName);
-      }
-    });
+    Set<String> segmentsToCheck;
+    String segmentsPendingToBeComittedString =
+        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST);
+
+    if (segmentsPendingToBeComittedString != null) {
+      segmentsToCheck = 
JsonUtils.stringToObject(segmentsPendingToBeComittedString, Set.class);
+    } else {
+      segmentsToCheck = consumingSegmentCommitted;
+    }
+
+    Set<String> segmentsYetToBeCommitted =
+        
_pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, 
segmentsToCheck);
+
+    if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) {
+      
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
+          JsonUtils.objectToString(segmentsYetToBeCommitted));
+      _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, 
controllerJobZKMetadata,
+          ControllerJobType.FORCE_COMMIT, prev -> true);
+    }
 
     Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
     result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 3ed88967c6..2b9cf8f954 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2009,4 +2009,20 @@ public class PinotLLCRealtimeSegmentManager {
   URI createSegmentPath(String rawTableName, String segmentName) {
     return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
+
+  public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, 
Set<String> segmentsToCheck) {
+    Set<String> segmentsYetToBeCommitted = new HashSet<>();
+    for (String segmentName: segmentsToCheck) {
+      SegmentZKMetadata segmentZKMetadata =
+          _helixResourceManager.getSegmentZKMetadata(tableNameWithType, 
segmentName);
+      if (segmentZKMetadata == null) {
+        // Segment is deleted. No need to track this segment among segments 
yetToBeCommitted.
+        continue;
+      }
+      if (!(segmentZKMetadata.getStatus().equals(Status.DONE))) {
+        segmentsYetToBeCommitted.add(segmentName);
+      }
+    }
+    return segmentsYetToBeCommitted;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index dbe640d364..d5969e611f 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
@@ -1247,6 +1248,36 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Assert.assertEquals(partitionIds.size(), 2);
   }
 
+  @Test
+  public void getSegmentsYetToBeCommitted() {
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
+        new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+
+    SegmentZKMetadata mockSegmentZKMetadataDone = 
mock(SegmentZKMetadata.class);
+    when(mockSegmentZKMetadataDone.getStatus()).thenReturn(Status.DONE);
+
+    SegmentZKMetadata mockSegmentZKMetadataUploaded = 
mock(SegmentZKMetadata.class);
+    
when(mockSegmentZKMetadataUploaded.getStatus()).thenReturn(Status.UPLOADED);
+
+    SegmentZKMetadata mockSegmentZKMetadataInProgress = 
mock(SegmentZKMetadata.class);
+    
when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS);
+
+    SegmentZKMetadata mockSegmentZKMetadataInCommitting = 
mock(SegmentZKMetadata.class);
+    
when(mockSegmentZKMetadataInCommitting.getStatus()).thenReturn(Status.COMMITTING);
+
+    when(mockHelixResourceManager.getSegmentZKMetadata("test", 
"s0")).thenReturn(mockSegmentZKMetadataDone);
+    when(mockHelixResourceManager.getSegmentZKMetadata("test", 
"s3")).thenReturn(mockSegmentZKMetadataDone);
+    when(mockHelixResourceManager.getSegmentZKMetadata("test", 
"s2")).thenReturn(mockSegmentZKMetadataUploaded);
+    when(mockHelixResourceManager.getSegmentZKMetadata("test", 
"s4")).thenReturn(mockSegmentZKMetadataInProgress);
+    when(mockHelixResourceManager.getSegmentZKMetadata("test", 
"s1")).thenReturn(null);
+    when(mockHelixResourceManager.getSegmentZKMetadata("test", 
"s5")).thenReturn(mockSegmentZKMetadataInCommitting);
+
+    Set<String> segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", 
"s4", "s5");
+    Set<String> segmentsYetToBeCommitted = 
realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck);
+    assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted);
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////
   // Fake classes
   
/////////////////////////////////////////////////////////////////////////////////
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 78b34fc563..1765550641 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.HashUtil;
@@ -427,12 +428,18 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
       throws Exception {
     Set<String> consumingSegments = 
getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
     String jobId = forceCommit(getTableName());
+    Map<String, String> jobMetadata =
+        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.FORCE_COMMIT);
+    assert jobMetadata != null;
+    assert jobMetadata.get("segmentsForceCommitted") != null;
 
     TestUtils.waitForCondition(aVoid -> {
       try {
         if (isForceCommitJobCompleted(jobId)) {
-          assertTrue(_controllerStarter.getHelixResourceManager()
-              .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", 
false).containsAll(consumingSegments));
+          for (String segmentName : consumingSegments) {
+            assertEquals(CommonConstants.Segment.Realtime.Status.DONE, 
_controllerStarter.getHelixResourceManager()
+                .getSegmentZKMetadata(getTableName() + "_REALTIME", 
segmentName).getStatus());
+          }
           return true;
         }
         return false;
@@ -462,6 +469,20 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
 
     assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
     assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");
+
+    assert 
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST)
 != null;
+    assert 
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)
 != null;
+
+    Set<String> allSegments = JsonUtils.stringToObject(
+        
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(),
 HashSet.class);
+    Set<String> segmentsPending = new HashSet<>();
+    for (JsonNode element : 
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST))
 {
+      segmentsPending.add(element.asText());
+    }
+
+    assert segmentsPending.size() <= allSegments.size();
+    assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 
segmentsPending.size();
+
     return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
index dce404d64d..e61cb07c69 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.HashUtil;
@@ -395,12 +396,18 @@ public class LLCRealtimeKafka3ClusterIntegrationTest 
extends BaseRealtimeCluster
       throws Exception {
     Set<String> consumingSegments = 
getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
     String jobId = forceCommit(getTableName());
+    Map<String, String> jobMetadata =
+        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.FORCE_COMMIT);
+    assert jobMetadata != null;
+    assert jobMetadata.get("segmentsForceCommitted") != null;
 
     TestUtils.waitForCondition(aVoid -> {
       try {
         if (isForceCommitJobCompleted(jobId)) {
-          assertTrue(_controllerStarter.getHelixResourceManager()
-              .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", 
false).containsAll(consumingSegments));
+          for (String segmentName : consumingSegments) {
+            assertEquals(CommonConstants.Segment.Realtime.Status.DONE, 
_controllerStarter.getHelixResourceManager()
+                .getSegmentZKMetadata(getTableName() + "_REALTIME", 
segmentName).getStatus());
+          }
           return true;
         }
         return false;
@@ -430,6 +437,20 @@ public class LLCRealtimeKafka3ClusterIntegrationTest 
extends BaseRealtimeCluster
 
     assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
     assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");
+
+    assert 
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST)
 != null;
+    assert 
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)
 != null;
+
+    Set<String> allSegments = JsonUtils.stringToObject(
+        
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(),
 HashSet.class);
+    Set<String> segmentsPending = new HashSet<>();
+    for (JsonNode element : 
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST))
 {
+      segmentsPending.add(element.asText());
+    }
+
+    assert segmentsPending.size() <= allSegments.size();
+    assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 
segmentsPending.size();
+
     return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 30f4b44e27..ecb26b5dfd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1039,6 +1039,7 @@ public class CommonConstants {
     public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = 
"instanceName";
     // Force commit job ZK props
     public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = 
"segmentsForceCommitted";
+    public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = 
"segmentsYetToBeCommitted";
   }
 
   // prefix for scheduler related features, e.g. query accountant


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to