This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2d30e28bed Add minion observability for segment upload/download
failures (#10978)
2d30e28bed is described below
commit 2d30e28bedea29d264b0d7946ebb3f0e2cf1b5cc
Author: Subbu Subramaniam <[email protected]>
AuthorDate: Mon Jun 26 19:25:26 2023 -0700
Add minion observability for segment upload/download failures (#10978)
* Add minion observability for segment upload/download failures
Currently, minions do not provide observability into upload or download
failures. Added mechanism to log errors and bump metrics when either
upload or download fails, so that operators can set alerts on these
metrics to detect the inconsistent state quickly and remediate if possible.
Issue #10973
* Style fix
---
.../apache/pinot/common/metrics/MinionMeter.java | 4 ++-
.../tasks/BaseSingleSegmentConversionExecutor.java | 31 +++++++++++++++++++---
2 files changed, 30 insertions(+), 5 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
index 101a950389..376f86e55e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMeter.java
@@ -29,7 +29,9 @@ public enum MinionMeter implements AbstractMetrics.Meter {
NUMBER_TASKS_COMPLETED("tasks", false),
NUMBER_TASKS_CANCELLED("tasks", false),
NUMBER_TASKS_FAILED("tasks", false),
- NUMBER_TASKS_FATAL_FAILED("tasks", false);
+ NUMBER_TASKS_FATAL_FAILED("tasks", false),
+ SEGMENT_UPLOAD_FAIL_COUNT("segments", false),
+ SEGMENT_DOWNLOAD_FAIL_COUNT("segments", false);
private final String _meterName;
private final String _unit;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index bc440e0c5e..51c7f98543 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -32,13 +32,17 @@ import org.apache.http.HttpHeaders;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
+import org.apache.pinot.common.Utils;
import org.apache.pinot.common.auth.AuthProviderUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
@@ -60,6 +64,7 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
// Tracking finer grained progress status.
protected PinotTaskConfig _pinotTaskConfig;
protected MinionEventObserver _eventObserver;
+ protected final MinionMetrics _minionMetrics =
MinionContext.getInstance().getMinionMetrics();
/**
* Converts the segment based on the given task config and returns the
conversion result.
@@ -101,7 +106,14 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
_eventObserver.notifyProgress(_pinotTaskConfig, "Downloading segment
from: " + downloadURL);
File tarredSegmentFile = new File(tempDataDir, "tarredSegment");
LOGGER.info("Downloading segment from {} to {}", downloadURL,
tarredSegmentFile.getAbsolutePath());
- SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURL,
tarredSegmentFile, crypterName);
+ try {
+ SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURL,
tarredSegmentFile, crypterName);
+ } catch (Exception e) {
+ _minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
+ LOGGER.error("Segment download failed for {}, crypter:{}",
downloadURL, crypterName, e);
+ _eventObserver.notifyTaskError(_pinotTaskConfig, e);
+ Utils.rethrowException(e);
+ }
// Un-tar the segment file
_eventObserver.notifyProgress(_pinotTaskConfig, "Decompressing segment
from: " + downloadURL);
@@ -177,13 +189,24 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
// Upload the tarred segment
_eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " +
segmentName);
- SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters,
tableNameWithType, segmentName, uploadURL,
- convertedTarredSegmentFile);
+ boolean uploadSuccessful = true;
+ try {
+ SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters,
tableNameWithType, segmentName,
+ uploadURL, convertedTarredSegmentFile);
+ } catch (Exception e) {
+ uploadSuccessful = false;
+ _minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT, 1L);
+ LOGGER.error("Segment upload failed for segment {}, table {}",
segmentName, tableNameWithType, e);
+ _eventObserver.notifyTaskError(_pinotTaskConfig, e);
+ }
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred converted segment: {}",
convertedTarredSegmentFile.getAbsolutePath());
}
- LOGGER.info("Done executing {} on table: {}, segment: {}", taskType,
tableNameWithType, segmentName);
+ if (uploadSuccessful) {
+ LOGGER.info("Done executing {} on table: {}, segment: {}", taskType,
tableNameWithType, segmentName);
+ }
+
return segmentConversionResult;
} finally {
FileUtils.deleteQuietly(tempDataDir);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]