This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 273e1de430 Fix reloading segment when the previous upload failed 
(#9631)
273e1de430 is described below

commit 273e1de43036bdb780c945c0cbe5fc6d6c0b8149
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Oct 20 12:12:52 2022 -0700

    Fix reloading segment when the previous upload failed (#9631)
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |  7 ++-
 .../pinot/controller/api/upload/ZKOperator.java    | 65 +++++++++++++++++-----
 .../helix/core/PinotHelixResourceManager.java      |  4 ++
 .../helix/core/SegmentDeletionManager.java         |  3 +
 .../controller/api/upload/ZKOperatorTest.java      | 26 ++++++++-
 5 files changed, 90 insertions(+), 15 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index ebb400387f..ac938fb73b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -219,6 +219,12 @@ public class ZKMetadataProvider {
     return setSegmentZKMetadata(propertyStore, tableNameWithType, 
segmentZKMetadata, -1);
   }
 
+  public static boolean removeSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableNameWithType,
+      String segmentName) {
+    return 
propertyStore.remove(constructPropertyStorePathForSegment(tableNameWithType, 
segmentName),
+        AccessOption.PERSISTENT);
+  }
+
   @Nullable
   public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String path) {
     Stat stat = new Stat();
@@ -238,7 +244,6 @@ public class ZKMetadataProvider {
         AccessOption.PERSISTENT);
     return znRecord != null ? new SegmentZKMetadata(znRecord) : null;
   }
-
   @Nullable
   public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String username) {
     ZNRecord znRecord =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 56f478595a..3d33a358f3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
@@ -67,10 +68,20 @@ public class ZKOperator {
       long segmentSizeInBytes, boolean enableParallelPushProtection, boolean 
allowRefresh, HttpHeaders headers)
       throws Exception {
     String segmentName = segmentMetadata.getName();
-    ZNRecord existingSegmentMetadataZNRecord =
-        
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
     boolean refreshOnly =
         
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
+
+    ZNRecord existingSegmentMetadataZNRecord =
+        
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+    if (existingSegmentMetadataZNRecord != null && 
shouldProcessAsNewSegment(tableNameWithType, segmentName,
+        existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
+      LOGGER.warn("Removing segment ZK metadata (recovering from previous 
upload failure) for table: {}, segment: {}",
+          tableNameWithType, segmentName);
+      
Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType,
 segmentName),
+          "Failed to remove segment ZK metadata for table: %s, segment: %s", 
tableNameWithType, segmentName);
+      existingSegmentMetadataZNRecord = null;
+    }
+
     if (existingSegmentMetadataZNRecord == null) {
       // Add a new segment
       if (refreshOnly) {
@@ -99,6 +110,44 @@ public class ZKOperator {
     }
   }
 
+  /**
+   * Returns {@code true} when the segment should be processed as new segment.
+   * <p>When segment ZK metadata exists, check if segment exists in the ideal 
state. If the previous upload failed after
+   * segment ZK metadata is created but before assigning the segment to the 
ideal state, we want to remove the existing
+   * segment ZK metadata and treat it as a new segment.
+   */
+  private boolean shouldProcessAsNewSegment(String tableNameWithType, String 
segmentName,
+      ZNRecord existingSegmentMetadataZNRecord, boolean 
enableParallelPushProtection) {
+    IdealState idealState = 
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
+    if (idealState.getInstanceStateMap(segmentName) != null) {
+      return false;
+    }
+    // Segment does not exist in the ideal state
+    if (enableParallelPushProtection) {
+      // Check segment upload start time when parallel push protection is 
enabled in case the segment is being uploaded
+      long segmentUploadStartTime = new 
SegmentZKMetadata(existingSegmentMetadataZNRecord).getSegmentUploadStartTime();
+      if (segmentUploadStartTime > 0) {
+        handleParallelPush(tableNameWithType, segmentName, 
segmentUploadStartTime);
+      }
+    }
+    return true;
+  }
+
+  private void handleParallelPush(String tableNameWithType, String 
segmentName, long segmentUploadStartTime) {
+    assert segmentUploadStartTime > 0;
+    if (System.currentTimeMillis() - segmentUploadStartTime > 
_controllerConf.getSegmentUploadTimeoutInMillis()) {
+      // Last segment upload does not finish properly, replace the segment
+      LOGGER.error("Segment: {} of table: {} was not properly uploaded, 
replacing it", segmentName, tableNameWithType);
+      
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED,
 1L);
+    } else {
+      // Another segment upload is in progress
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Another segment upload is in progress for segment: %s 
of table: %s, retry later", segmentName,
+              tableNameWithType), Response.Status.CONFLICT);
+    }
+  }
+
   private void processExistingSegment(String tableNameWithType, 
SegmentMetadata segmentMetadata,
       FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, 
@Nullable URI finalSegmentLocationURI,
       File segmentFile, @Nullable String sourceDownloadURIStr, String 
segmentDownloadURIStr,
@@ -117,17 +166,7 @@ public class ZKOperator {
       // When segment upload start time is larger than 0, that means another 
upload is in progress
       long segmentUploadStartTime = 
segmentZKMetadata.getSegmentUploadStartTime();
       if (segmentUploadStartTime > 0) {
-        if (System.currentTimeMillis() - segmentUploadStartTime > 
_controllerConf.getSegmentUploadTimeoutInMillis()) {
-          // Last segment upload does not finish properly, replace the segment
-          LOGGER.error("Segment: {} of table: {} was not properly uploaded, 
replacing it", segmentName,
-              tableNameWithType);
-          
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED,
 1L);
-        } else {
-          // Another segment upload is in progress
-          throw new ControllerApplicationException(LOGGER,
-              String.format("Another segment upload is in progress for 
segment: %s of table: %s, retry later",
-                  segmentName, tableNameWithType), Response.Status.CONFLICT);
-        }
+        handleParallelPush(tableNameWithType, segmentName, 
segmentUploadStartTime);
       }
 
       // Lock the segment by setting the upload start time in ZK
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index e03343eae0..df4df3c5ee 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2247,6 +2247,10 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentZKMetadata);
   }
 
+  public boolean removeSegmentZKMetadata(String tableNameWithType, String 
segmentName) {
+    return ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);
+  }
+
   /**
    * Delete the table on servers by sending table deletion message
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 9fd9b5dbcd..570a263ac8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -180,6 +180,9 @@ public class SegmentDeletionManager {
       }
       segmentsToDelete.removeAll(propStoreFailedSegs);
 
+      // TODO: If removing segments from deep store fails (e.g. controller 
crashes, deep store unavailable), these
+      //       segments will become orphans and not easy to track because 
their ZK metadata are already deleted.
+      //       Consider removing segments from deep store before cleaning up 
the ZK metadata.
       removeSegmentsFromStore(tableName, segmentsToDelete, 
deletedSegmentsRetentionMs);
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index d5566e24a9..e54f62b81c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
@@ -220,7 +221,6 @@ public class ZKOperatorTest {
 
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
FileUploadType.SEGMENT, null, null,
         "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
-
     SegmentZKMetadata segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
     assertEquals(segmentZKMetadata.getCrc(), 12345L);
@@ -233,6 +233,30 @@ public class ZKOperatorTest {
     assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
     assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
 
+    // Test if the same segment can be uploaded when the previous upload 
failed after segment ZK metadata is created but
+    // before segment is assigned to the ideal state
+    // Manually remove the segment from the ideal state
+    IdealState idealState = 
_resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+    assertNotNull(idealState);
+    idealState.getRecord().getMapFields().remove(SEGMENT_NAME);
+    _resourceManager.getHelixAdmin()
+        .setResourceIdealState(_resourceManager.getHelixClusterName(), 
OFFLINE_TABLE_NAME, idealState);
+    // The segment should be uploaded as a new segment (push time should 
change, and refresh time shouldn't be set)
+    zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
FileUploadType.SEGMENT, null, null,
+        "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
+    segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
+    assertEquals(segmentZKMetadata.getCrc(), 12345L);
+    assertEquals(segmentZKMetadata.getCreationTime(), 123L);
+    assertTrue(segmentZKMetadata.getPushTime() > pushTime);
+    pushTime = segmentZKMetadata.getPushTime();
+    assertTrue(pushTime > 0);
+    assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
+    assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
+    assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
+    assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+    assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
+
     // Upload the same segment with allowRefresh = false. Validate that an 
exception is thrown.
     try {
       zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, 
segmentMetadata, FileUploadType.SEGMENT, null, null,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to