This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 273e1de430 Fix reloading segment when the previous upload failed
(#9631)
273e1de430 is described below
commit 273e1de43036bdb780c945c0cbe5fc6d6c0b8149
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Oct 20 12:12:52 2022 -0700
Fix reloading segment when the previous upload failed (#9631)
---
.../pinot/common/metadata/ZKMetadataProvider.java | 7 ++-
.../pinot/controller/api/upload/ZKOperator.java | 65 +++++++++++++++++-----
.../helix/core/PinotHelixResourceManager.java | 4 ++
.../helix/core/SegmentDeletionManager.java | 3 +
.../controller/api/upload/ZKOperatorTest.java | 26 ++++++++-
5 files changed, 90 insertions(+), 15 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index ebb400387f..ac938fb73b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -219,6 +219,12 @@ public class ZKMetadataProvider {
return setSegmentZKMetadata(propertyStore, tableNameWithType,
segmentZKMetadata, -1);
}
+ public static boolean removeSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableNameWithType,
+ String segmentName) {
+ return
propertyStore.remove(constructPropertyStorePathForSegment(tableNameWithType,
segmentName),
+ AccessOption.PERSISTENT);
+ }
+
@Nullable
public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord>
propertyStore, String path) {
Stat stat = new Stat();
@@ -238,7 +244,6 @@ public class ZKMetadataProvider {
AccessOption.PERSISTENT);
return znRecord != null ? new SegmentZKMetadata(znRecord) : null;
}
-
@Nullable
public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String username) {
ZNRecord znRecord =
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 56f478595a..3d33a358f3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -24,6 +24,7 @@ import java.net.URI;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
+import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
@@ -67,10 +68,20 @@ public class ZKOperator {
long segmentSizeInBytes, boolean enableParallelPushProtection, boolean
allowRefresh, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
- ZNRecord existingSegmentMetadataZNRecord =
-
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
boolean refreshOnly =
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
+
+ ZNRecord existingSegmentMetadataZNRecord =
+
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
+ if (existingSegmentMetadataZNRecord != null &&
shouldProcessAsNewSegment(tableNameWithType, segmentName,
+ existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
+ LOGGER.warn("Removing segment ZK metadata (recovering from previous
upload failure) for table: {}, segment: {}",
+ tableNameWithType, segmentName);
+
Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType,
segmentName),
+ "Failed to remove segment ZK metadata for table: %s, segment: %s",
tableNameWithType, segmentName);
+ existingSegmentMetadataZNRecord = null;
+ }
+
if (existingSegmentMetadataZNRecord == null) {
// Add a new segment
if (refreshOnly) {
@@ -99,6 +110,44 @@ public class ZKOperator {
}
}
+ /**
+ * Returns {@code true} when the segment should be processed as new segment.
+ * <p>When segment ZK metadata exists, check if segment exists in the ideal
state. If the previous upload failed after
+ * segment ZK metadata is created but before assigning the segment to the
ideal state, we want to remove the existing
+ * segment ZK metadata and treat it as a new segment.
+ */
+ private boolean shouldProcessAsNewSegment(String tableNameWithType, String
segmentName,
+ ZNRecord existingSegmentMetadataZNRecord, boolean
enableParallelPushProtection) {
+ IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
+ if (idealState.getInstanceStateMap(segmentName) != null) {
+ return false;
+ }
+ // Segment does not exist in the ideal state
+ if (enableParallelPushProtection) {
+ // Check segment upload start time when parallel push protection is
enabled in case the segment is being uploaded
+ long segmentUploadStartTime = new
SegmentZKMetadata(existingSegmentMetadataZNRecord).getSegmentUploadStartTime();
+ if (segmentUploadStartTime > 0) {
+ handleParallelPush(tableNameWithType, segmentName,
segmentUploadStartTime);
+ }
+ }
+ return true;
+ }
+
+ private void handleParallelPush(String tableNameWithType, String
segmentName, long segmentUploadStartTime) {
+ assert segmentUploadStartTime > 0;
+ if (System.currentTimeMillis() - segmentUploadStartTime >
_controllerConf.getSegmentUploadTimeoutInMillis()) {
+ // Last segment upload does not finish properly, replace the segment
+ LOGGER.error("Segment: {} of table: {} was not properly uploaded,
replacing it", segmentName, tableNameWithType);
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED,
1L);
+ } else {
+ // Another segment upload is in progress
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Another segment upload is in progress for segment: %s
of table: %s, retry later", segmentName,
+ tableNameWithType), Response.Status.CONFLICT);
+ }
+ }
+
private void processExistingSegment(String tableNameWithType,
SegmentMetadata segmentMetadata,
FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord,
@Nullable URI finalSegmentLocationURI,
File segmentFile, @Nullable String sourceDownloadURIStr, String
segmentDownloadURIStr,
@@ -117,17 +166,7 @@ public class ZKOperator {
// When segment upload start time is larger than 0, that means another
upload is in progress
long segmentUploadStartTime =
segmentZKMetadata.getSegmentUploadStartTime();
if (segmentUploadStartTime > 0) {
- if (System.currentTimeMillis() - segmentUploadStartTime >
_controllerConf.getSegmentUploadTimeoutInMillis()) {
- // Last segment upload does not finish properly, replace the segment
- LOGGER.error("Segment: {} of table: {} was not properly uploaded,
replacing it", segmentName,
- tableNameWithType);
-
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED,
1L);
- } else {
- // Another segment upload is in progress
- throw new ControllerApplicationException(LOGGER,
- String.format("Another segment upload is in progress for
segment: %s of table: %s, retry later",
- segmentName, tableNameWithType), Response.Status.CONFLICT);
- }
+ handleParallelPush(tableNameWithType, segmentName,
segmentUploadStartTime);
}
// Lock the segment by setting the upload start time in ZK
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index e03343eae0..df4df3c5ee 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2247,6 +2247,10 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentZKMetadata);
}
+ public boolean removeSegmentZKMetadata(String tableNameWithType, String
segmentName) {
+ return ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentName);
+ }
+
/**
* Delete the table on servers by sending table deletion message
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 9fd9b5dbcd..570a263ac8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -180,6 +180,9 @@ public class SegmentDeletionManager {
}
segmentsToDelete.removeAll(propStoreFailedSegs);
+ // TODO: If removing segments from deep store fails (e.g. controller
crashes, deep store unavailable), these
+ // segments will become orphans and not easy to track because
their ZK metadata are already deleted.
+ // Consider removing segments from deep store before cleaning up
the ZK metadata.
removeSegmentsFromStore(tableName, segmentsToDelete,
deletedSegmentsRetentionMs);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index d5566e24a9..e54f62b81c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
@@ -220,7 +221,6 @@ public class ZKOperatorTest {
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
FileUploadType.SEGMENT, null, null,
"downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
-
SegmentZKMetadata segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
@@ -233,6 +233,30 @@ public class ZKOperatorTest {
assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
+ // Test if the same segment can be uploaded when the previous upload
failed after segment ZK metadata is created but
+ // before segment is assigned to the ideal state
+ // Manually remove the segment from the ideal state
+ IdealState idealState =
_resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
+ assertNotNull(idealState);
+ idealState.getRecord().getMapFields().remove(SEGMENT_NAME);
+ _resourceManager.getHelixAdmin()
+ .setResourceIdealState(_resourceManager.getHelixClusterName(),
OFFLINE_TABLE_NAME, idealState);
+ // The segment should be uploaded as a new segment (push time should
change, and refresh time shouldn't be set)
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
FileUploadType.SEGMENT, null, null,
+ "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
+ segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
+ assertEquals(segmentZKMetadata.getCrc(), 12345L);
+ assertEquals(segmentZKMetadata.getCreationTime(), 123L);
+ assertTrue(segmentZKMetadata.getPushTime() > pushTime);
+ pushTime = segmentZKMetadata.getPushTime();
+ assertTrue(pushTime > 0);
+ assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
+ assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
+ assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
+ assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
+ assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
+
// Upload the same segment with allowRefresh = false. Validate that an
exception is thrown.
try {
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME,
segmentMetadata, FileUploadType.SEGMENT, null, null,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]