This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d9c0790a46 Periodically delete Tmp Segment file brought by the
SplitCommit End phase (#10815)
d9c0790a46 is described below
commit d9c0790a464eaf2575a2b7225827af36f4b0cd25
Author: Xuanyi Li <[email protected]>
AuthorDate: Mon Sep 18 19:08:07 2023 -0700
Periodically delete Tmp Segment file brought by the SplitCommit End phase
(#10815)
---
.../pinot/common/metrics/AbstractMetrics.java | 20 +++--
.../pinot/common/metrics/ControllerMeter.java | 1 +
.../pinot/controller/BaseControllerStarter.java | 4 +-
.../apache/pinot/controller/ControllerConf.java | 15 ++++
.../resources/LLCSegmentCompletionHandlers.java | 4 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 100 ++++++++++++++++++---
.../RealtimeSegmentValidationManager.java | 15 ++++
.../PinotLLCRealtimeSegmentManagerTest.java | 65 +++++++++++---
.../manager/realtime/PinotFSSegmentUploader.java | 3 +-
.../manager/realtime}/SegmentCompletionUtils.java | 22 ++++-
.../realtime}/SegmentCompletionUtilsTest.java | 19 ++--
11 files changed, 224 insertions(+), 44 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index ff41b12abf..98f65a4be9 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -285,9 +285,7 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
*/
public PinotMeter addMeteredTableValue(final String tableName, final M
meter, final long unitCount,
PinotMeter reusedMeter) {
- String meterName = meter.getMeterName();
- final String fullMeterName = _metricPrefix + getTableName(tableName) + "."
+ meterName;
- return addValueToMeter(fullMeterName, meter.getUnit(), unitCount,
reusedMeter);
+ return addValueToMeter(getTableFullMeterName(tableName, meter),
meter.getUnit(), unitCount, reusedMeter);
}
/**
@@ -331,14 +329,17 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
}
public PinotMeter getMeteredTableValue(final String tableName, final M
meter) {
- final String fullMeterName;
- String meterName = meter.getMeterName();
- fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
- final PinotMetricName metricName =
PinotMetricUtils.makePinotMetricName(_clazz, fullMeterName);
+ final PinotMetricName metricName =
PinotMetricUtils.makePinotMetricName(_clazz,
+ getTableFullMeterName(tableName, meter));
return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName,
meter.getUnit(), TimeUnit.SECONDS);
}
+ private String getTableFullMeterName(final String tableName, final M meter) {
+ String meterName = meter.getMeterName();
+ return _metricPrefix + getTableName(tableName) + "." + meterName;
+ }
+
/**
* @deprecated Please use addMeteredTableValue(final String tableName, final
M meter, final long unitCount), which is
* designed for tracking count and rates.
@@ -723,6 +724,11 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
removeGaugeFromMetricRegistry(gaugeName);
}
+ public void removeTableMeter(final String tableName, final M meter) {
+ PinotMetricUtils.removeMetric(_metricsRegistry,
+ PinotMetricUtils.makePinotMetricName(_clazz,
getTableFullMeterName(tableName, meter)));
+ }
+
/**
* Remove callback gauge.
* @param metricName metric name
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 8de33d0e41..a3e6da0086 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -59,6 +59,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS("LLCSegmentDeepStoreUploadRetrySuccess",
false),
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError",
false),
SEGMENT_MISSING_DEEP_STORE_LINK("RealtimeSegmentMissingDeepStoreLink",
false),
+ DELETED_TMP_SEGMENT_COUNT("DeletedTmpSegmentCount", false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index e983fd80ff..e7f02b57df 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -156,6 +156,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected HelixManager _helixParticipantManager;
protected PinotMetricsRegistry _metricsRegistry;
protected ControllerMetrics _controllerMetrics;
+ protected ValidationMetrics _validationMetrics;
protected SqlQueryExecutor _sqlQueryExecutor;
// Can only be constructed after resource manager getting started
protected OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
@@ -586,6 +587,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_controllerMetrics = new ControllerMetrics(_config.getMetricsPrefix(),
_metricsRegistry);
_controllerMetrics.initializeGlobalMeters();
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.VERSION,
PinotVersion.VERSION_METRIC_NAME, 1);
+ _validationMetrics = new ValidationMetrics(_metricsRegistry);
}
private void initPinotFSFactory() {
@@ -705,7 +707,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
periodicTasks.add(_offlineSegmentIntervalChecker);
_realtimeSegmentValidationManager =
new RealtimeSegmentValidationManager(_config, _helixResourceManager,
_leadControllerManager,
- _pinotLLCRealtimeSegmentManager, new
ValidationMetrics(_metricsRegistry), _controllerMetrics);
+ _pinotLLCRealtimeSegmentManager, _validationMetrics,
_controllerMetrics);
periodicTasks.add(_realtimeSegmentValidationManager);
_brokerResourceValidationManager =
new BrokerResourceValidationManager(_config, _helixResourceManager,
_leadControllerManager, _controllerMetrics);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 73abaa14a4..22a838faf1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -216,9 +216,15 @@ public class ControllerConf extends PinotConfiguration {
"controller.realtime.segment.deepStoreUploadRetryEnabled";
public static final String DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS =
"controller.realtime.segment.deepStoreUploadRetry.timeoutMs";
+ public static final String ENABLE_TMP_SEGMENT_ASYNC_DELETION =
+ "controller.realtime.segment.tmpFileAsyncDeletionEnabled";
+ // temporary segments within expiration won't be deleted so that ongoing
split commit won't be impacted.
+ public static final String TMP_SEGMENT_RETENTION_IN_SECONDS =
+ "controller.realtime.segment.tmpFileRetentionInSeconds";
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+ public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND =
60 * 60; // 1 Hour.
private static final Random RANDOM = new Random();
@@ -924,10 +930,19 @@ public class ControllerConf extends PinotConfiguration {
return
getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
false);
}
+ public boolean isTmpSegmentAsyncDeletionEnabled() {
+ return
getProperty(ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION,
false);
+ }
+
public int getDeepStoreRetryUploadTimeoutMs() {
return
getProperty(ControllerPeriodicTasksConf.DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS, -1);
}
+ public int getTmpSegmentRetentionInSeconds() {
+ return
getProperty(ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS,
+
ControllerPeriodicTasksConf.DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND);
+ }
+
public long getPinotTaskManagerInitialDelaySeconds() {
return getPeriodicTaskInitialDelayInSeconds();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index bf852f582e..836f58052d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -47,10 +47,10 @@ import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.filesystem.PinotFS;
@@ -378,7 +378,7 @@ public class LLCSegmentCompletionHandlers {
String rawTableName = new LLCSegmentName(segmentName).getTableName();
URI segmentFileURI = URIUtils
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
-
URIUtils.encode(SegmentCompletionUtils.generateSegmentFileName(segmentName)));
+
URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName)));
PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile,
segmentFileURI);
SegmentCompletionProtocol.Response.Params responseParams = new
SegmentCompletionProtocol.Response.Params()
.withStreamPartitionMsgOffset(requestParams.getStreamPartitionMsgOffset())
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e55f0e6463..971dd4333d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -72,8 +72,8 @@ import
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
import
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
@@ -165,6 +165,7 @@ public class PinotLLCRealtimeSegmentManager {
private final Lock[] _idealStateUpdateLocks;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
+ private final boolean _isTmpSegmentAsyncDeletionEnabled;
private final int _deepstoreUploadRetryTimeoutMs;
private final FileUploadDownloadClient _fileUploadDownloadClient;
private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
@@ -190,6 +191,7 @@ public class PinotLLCRealtimeSegmentManager {
}
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
_isDeepStoreLLCSegmentUploadRetryEnabled =
controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+ _isTmpSegmentAsyncDeletionEnabled =
controllerConf.isTmpSegmentAsyncDeletionEnabled();
_deepstoreUploadRetryTimeoutMs =
controllerConf.getDeepStoreRetryUploadTimeoutMs();
_fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ?
initFileUploadDownloadClient() : null;
}
@@ -198,6 +200,10 @@ public class PinotLLCRealtimeSegmentManager {
return _isDeepStoreLLCSegmentUploadRetryEnabled;
}
+ public boolean isTmpSegmentAsyncDeletionEnabled() {
+ return _isTmpSegmentAsyncDeletionEnabled;
+ }
+
@VisibleForTesting
FileUploadDownloadClient initFileUploadDownloadClient() {
return new FileUploadDownloadClient();
@@ -460,19 +466,17 @@ public class PinotLLCRealtimeSegmentManager {
PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
String uriToMoveTo = moveSegmentFile(rawTableName, segmentName,
segmentLocation, pinotFS);
- // Cleans up tmp segment files under table dir.
- // We only clean up tmp segment files in table level dir, so there's no
need to list recursively.
- // See LLCSegmentCompletionHandlers.uploadSegment().
- // TODO: move tmp file logic into SegmentCompletionUtils.
- try {
- for (String uri : pinotFS.listFiles(tableDirURI, false)) {
- if
(uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
- LOGGER.warn("Deleting temporary segment file: {}", uri);
- Preconditions.checkState(pinotFS.delete(new URI(uri), true), "Failed
to delete file: %s", uri);
+ if (!isTmpSegmentAsyncDeletionEnabled()) {
+ try {
+ for (String uri : pinotFS.listFiles(tableDirURI, false)) {
+ if
(uri.contains(SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName))) {
+ LOGGER.warn("Deleting temporary segment file: {}", uri);
+ Preconditions.checkState(pinotFS.delete(new URI(uri), true),
"Failed to delete file: %s", uri);
+ }
}
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while deleting temporary segment files
for segment: {}", segmentName, e);
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while deleting temporary segment files for
segment: {}", segmentName, e);
}
committingSegmentDescriptor.setSegmentLocation(uriToMoveTo);
}
@@ -1444,6 +1448,78 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ /**
+ * Delete tmp segments for realtime table with low level consumer, split
commit and async deletion is enabled.
+ * @param tableNameWithType
+ * @param segmentsZKMetadata
+ * @return number of deleted orphan temporary segments
+ *
+ */
+ public long deleteTmpSegments(String tableNameWithType,
List<SegmentZKMetadata> segmentsZKMetadata) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return 0L;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
+ return 0L;
+ }
+
+ if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+ || !getIsSplitCommitEnabled()
+ || !isTmpSegmentAsyncDeletionEnabled()) {
+ return 0L;
+ }
+
+ Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta ->
meta.getStatus() == Status.DONE
+ &&
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
+ SegmentZKMetadata::getDownloadUrl).collect(
+ Collectors.toSet());
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(),
rawTableName);
+ PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
+ long deletedTmpSegments = 0;
+ try {
+ for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+ // prepend scheme
+ URI uri = URIUtils.getUri(filePath);
+ if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
+ LOGGER.info("Deleting temporary segment file: {}", uri);
+ if (pinotFS.delete(uri, true)) {
+ LOGGER.info("Succeed to delete file: {}", uri);
+ deletedTmpSegments++;
+ } else {
+ LOGGER.warn("Failed to delete file: {}", uri);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while deleting temporary files for table:
{}", rawTableName, e);
+ }
+ return deletedTmpSegments;
+ }
+
+ private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS
pinotFS) throws Exception {
+ long lastModified = pinotFS.lastModified(uri);
+ if (lastModified <= 0) {
+ LOGGER.warn("file {} modification time {} is not positive, ineligible
for delete", uri.toString(), lastModified);
+ return false;
+ }
+ String uriString = uri.toString();
+ return SegmentCompletionUtils.isTmpFile(uriString) &&
!deepURIs.contains(uriString)
+ && getCurrentTimeMs() - lastModified >
_controllerConf.getTmpSegmentRetentionInSeconds() * 1000L;
+ }
+
+ private boolean isLowLevelConsumer(String tableNameWithType, TableConfig
tableConfig) {
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableNameWithType,
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ return streamConfig.hasLowLevelConsumerType();
+ }
+
/**
* Force commit the current segments in consuming state and restart
consumption
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index d0a7bb1a37..42ed189bbf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.controller.ControllerConf;
@@ -49,6 +50,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
private final ValidationMetrics _validationMetrics;
+ private final ControllerMetrics _controllerMetrics;
private final int _segmentLevelValidationIntervalInSeconds;
private long _lastSegmentLevelValidationRunTimeMs = 0L;
@@ -64,6 +66,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
+ _controllerMetrics = controllerMetrics;
_segmentLevelValidationIntervalInSeconds =
config.getSegmentLevelValidationIntervalInSeconds();
Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
@@ -111,8 +114,19 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig
streamConfig) {
String realtimeTableName = tableConfig.getTableName();
+
List<SegmentZKMetadata> segmentsZKMetadata =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+ // Delete tmp segments
+ try {
+ long numDeleteTmpSegments =
_llcRealtimeSegmentManager.deleteTmpSegments(realtimeTableName,
segmentsZKMetadata);
+ LOGGER.info("Deleted {} tmp segments for table: {}",
numDeleteTmpSegments, realtimeTableName);
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.DELETED_TMP_SEGMENT_COUNT,
+ numDeleteTmpSegments);
+ } catch (Exception e) {
+ LOGGER.error("Failed to delete tmp segments for table: {}",
realtimeTableName, e);
+ }
+
// Update the total document count gauge
_validationMetrics.updateTotalDocumentCountGauge(realtimeTableName,
computeTotalDocumentCount(segmentsZKMetadata));
@@ -127,6 +141,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
for (String tableNameWithType : tableNamesWithType) {
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
_validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+ _controllerMetrics.removeTableMeter(tableNameWithType,
ControllerMeter.DELETED_TMP_SEGMENT_COUNT);
}
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index d367741080..9a53ad839e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -57,7 +58,7 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -85,6 +86,10 @@ import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
+import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
+import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
+import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
+import static
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -787,7 +792,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
PinotFSFactory.init(new PinotConfiguration());
File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
- String segmentFileName =
SegmentCompletionUtils.generateSegmentFileName(segmentName);
+ String segmentFileName =
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
File segmentFile = new File(tableDir, segmentFileName);
FileUtils.write(segmentFile, "temporary file contents");
@@ -808,9 +813,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
String otherSegmentName = new LLCSegmentName(RAW_TABLE_NAME, 1, 0,
CURRENT_TIME_MS).getSegmentName();
- String segmentFileName =
SegmentCompletionUtils.generateSegmentFileName(segmentName);
- String extraSegmentFileName =
SegmentCompletionUtils.generateSegmentFileName(segmentName);
- String otherSegmentFileName =
SegmentCompletionUtils.generateSegmentFileName(otherSegmentName);
+ String segmentFileName =
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
+ String extraSegmentFileName =
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
+ String otherSegmentFileName =
SegmentCompletionUtils.generateTmpSegmentFileName(otherSegmentName);
File segmentFile = new File(tableDir, segmentFileName);
File extraSegmentFile = new File(tableDir, extraSegmentFileName);
File otherSegmentFile = new File(tableDir, otherSegmentFileName);
@@ -957,7 +962,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Change 1st segment status to be DONE, but with default peer download
url.
// Verify later the download url is fixed after upload success.
segmentsZKMetadata.get(0).setStatus(Status.DONE);
-
segmentsZKMetadata.get(0).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+ segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 1st segment
String instance0 = "instance0";
externalView.setState(segmentsZKMetadata.get(0).getSegmentName(),
instance0, "ONLINE");
@@ -986,7 +991,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Change 2nd segment status to be DONE, but with default peer download
url.
// Verify later the download url isn't fixed after upload failure.
segmentsZKMetadata.get(1).setStatus(Status.DONE);
-
segmentsZKMetadata.get(1).setDownloadUrl(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+ segmentsZKMetadata.get(1).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 2nd segment
String instance1 = "instance1";
externalView.setState(segmentsZKMetadata.get(1).getSegmentName(),
instance1, "ONLINE");
@@ -1010,7 +1015,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Verify later the download url isn't fixed because no ONLINE replica
found in any server.
segmentsZKMetadata.get(2).setStatus(Status.DONE);
segmentsZKMetadata.get(2).setDownloadUrl(
- CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+ METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 3rd segment
String instance2 = "instance2";
externalView.setState(segmentsZKMetadata.get(2).getSegmentName(),
instance2, "OFFLINE");
@@ -1039,10 +1044,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertEquals(
segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(1), null).getDownloadUrl(),
- CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+ METADATA_URI_FOR_PEER_DOWNLOAD);
assertEquals(
segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(2), null).getDownloadUrl(),
- CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+ METADATA_URI_FOR_PEER_DOWNLOAD);
assertEquals(
segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(3), null).getDownloadUrl(),
defaultDownloadUrl);
@@ -1050,6 +1055,46 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(4), null).getDownloadUrl());
}
+ @Test
+ public void testDeleteTmpSegmentFiles() throws Exception {
+ // turn on knobs for async deletion of tmp files
+ ControllerConf config = new ControllerConf();
+ config.setDataDir(TEMP_DIR.toString());
+ config.setProperty(ENABLE_SPLIT_COMMIT, true);
+ config.setProperty(TMP_SEGMENT_RETENTION_IN_SECONDS, Integer.MIN_VALUE);
+ config.setProperty(ENABLE_TMP_SEGMENT_ASYNC_DELETION, true);
+
+ // simulate there's an orphan tmp file in localFS
+ PinotFSFactory.init(new PinotConfiguration());
+ File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
+ String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ String segmentFileName =
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName);
+ File segmentFile = new File(tableDir, segmentFileName);
+ FileUtils.write(segmentFile, "temporary file contents",
Charset.defaultCharset());
+
+ SegmentZKMetadata segZKMeta = mock(SegmentZKMetadata.class);
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ when(helixResourceManager.getTableConfig(REALTIME_TABLE_NAME))
+ .thenReturn(new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build());
+ PinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager(
+ helixResourceManager, config);
+
+ long deletedTmpSegCount;
+ // case 1: the segmentMetadata download uri is identical to the uri of the
tmp segment. Should not delete
+ when(segZKMeta.getStatus()).thenReturn(Status.DONE);
+ when(segZKMeta.getDownloadUrl()).thenReturn(SCHEME + tableDir + "/" +
segmentFileName);
+ deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME,
Collections.singletonList(segZKMeta));
+ assertTrue(segmentFile.exists());
+ assertEquals(0L, deletedTmpSegCount);
+
+ // case 2: download url is empty, indicating the tmp segment is absolutely
orphan. Delete the file
+
when(segZKMeta.getDownloadUrl()).thenReturn(METADATA_URI_FOR_PEER_DOWNLOAD);
+ deletedTmpSegCount = segmentManager.deleteTmpSegments(REALTIME_TABLE_NAME,
Collections.singletonList(segZKMeta));
+ assertFalse(segmentFile.exists());
+ assertEquals(1L, deletedTmpSegCount);
+ }
+
//////////////////////////////////////////////////////////////////////////////////
// Fake classes
/////////////////////////////////////////////////////////////////////////////////
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 4f173521c9..949fc7a0b7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager.realtime;
import java.io.File;
import java.net.URI;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -74,7 +73,7 @@ public class PinotFSSegmentUploader implements
SegmentUploader {
final String rawTableName =
TableNameBuilder.extractRawTableName(segmentName.getTableName());
Callable<URI> uploadTask = () -> {
URI destUri = new URI(StringUtil.join(File.separator,
_segmentStoreUriStr, segmentName.getTableName(),
- segmentName.getSegmentName() + UUID.randomUUID().toString()));
+
SegmentCompletionUtils.generateTmpSegmentFileName(segmentName.getSegmentName())));
long startTime = System.currentTimeMillis();
try {
PinotFS pinotFS = PinotFSFactory.create(new
URI(_segmentStoreUriStr).getScheme());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
similarity index 69%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
index 9feb6cdbc6..61f270c3bf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentCompletionUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.controller.util;
+package org.apache.pinot.core.data.manager.realtime;
import java.util.UUID;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +38,24 @@ public class SegmentCompletionUtils {
* @param segmentName segment name
* @return
*/
- public static String getSegmentNamePrefix(String segmentName) {
+ public static String getTmpSegmentNamePrefix(String segmentName) {
return segmentName + TMP;
}
- public static String generateSegmentFileName(String segmentNameStr) {
- return getSegmentNamePrefix(segmentNameStr) + UUID.randomUUID().toString();
+ public static String generateTmpSegmentFileName(String segmentNameStr) {
+ return getTmpSegmentNamePrefix(segmentNameStr) + UUID.randomUUID();
+ }
+
+ public static boolean isTmpFile(String uri) {
+ String[] splits = StringUtils.splitByWholeSeparator(uri, TMP);
+ if (splits.length < 2) {
+ return false;
+ }
+ try {
+ UUID.fromString(splits[splits.length - 1]);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java
similarity index 60%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java
index 06f9cd43c0..270f763dc3 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtilsTest.java
@@ -16,27 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.controller.api;
+package org.apache.pinot.core.data.manager.realtime;
-import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-
public class SegmentCompletionUtilsTest {
@Test
public void testGenerateSegmentFilePrefix() {
String segmentName = "segment";
- assertEquals(SegmentCompletionUtils.getSegmentNamePrefix(segmentName),
"segment.tmp.");
+ assertEquals(SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName),
"segment.tmp.");
}
@Test
public void testGenerateSegmentLocation() {
String segmentName = "segment";
- String segmentNamePrefix =
SegmentCompletionUtils.getSegmentNamePrefix(segmentName);
-
assertTrue(SegmentCompletionUtils.generateSegmentFileName(segmentName).startsWith(segmentNamePrefix));
+ String segmentNamePrefix =
SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName);
+
assertTrue(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName).startsWith(segmentNamePrefix));
+ }
+
+ @Test
+ public void testIsTmpFile() {
+
assertTrue(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp.550e8400-e29b-41d4-a716-446655440000"));
+ assertFalse(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp."));
+
assertFalse(SegmentCompletionUtils.isTmpFile(".tmp.550e8400-e29b-41d4-a716-446655440000"));
+ assertFalse(SegmentCompletionUtils.isTmpFile("hdfs://foo.tmp.55"));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]