This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6b43aef cleanup tar.gz segment files on job exit (#6385)
6b43aef is described below
commit 6b43aef4c9d4bd558c29ca3b68552f91bcaff693
Author: Karthik Amarnath <[email protected]>
AuthorDate: Tue Dec 29 12:03:43 2020 -0800
cleanup tar.gz segment files on job exit (#6385)
* cleanup tar.gz post upload complete.
* cleanup tar.gz segment files on job exit.
---
.../ingestion/batch/common/SegmentPushUtils.java | 44 +++++++++++++++-------
.../batch/spec/SegmentGenerationJobSpec.java | 13 +++++++
2 files changed, 44 insertions(+), 13 deletions(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 71ccfbe..4c3b41b 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
@@ -77,7 +78,8 @@ public class SegmentPushUtils implements Serializable {
return new URI(scheme, fileURI.getUserInfo(), host, port,
fileURI.getPath(), fileURI.getQuery(),
fileURI.getFragment());
} catch (URISyntaxException e) {
- LOGGER.warn("Unable to generate push uri based from dir URI: {} and
file URI: {}, directly return file URI.", dirURI, fileURI);
+ LOGGER.warn("Unable to generate push uri based from dir URI: {} and
file URI: {}, directly return file URI.",
+ dirURI, fileURI);
return fileURI;
}
}
@@ -87,6 +89,7 @@ public class SegmentPushUtils implements Serializable {
public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS
fileSystem, List<String> tarFilePaths)
throws RetriableOperationException, AttemptsExceededException {
String tableName = spec.getTableSpec().getTableName();
+ boolean cleanUpOutputDir = spec.isCleanUpOutputDir();
LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
Arrays.toString(tarFilePaths.subList(0, Math.min(5,
tarFilePaths.size())).toArray()),
Arrays.toString(spec.getPinotClusterSpecs()), tableName);
@@ -134,6 +137,10 @@ public class SegmentPushUtils implements Serializable {
segmentName, controllerURI, e);
throw e;
}
+ } finally {
+ if (cleanUpOutputDir) {
+ fileSystem.delete(tarFileURI, true);
+ }
}
});
}
@@ -147,6 +154,8 @@ public class SegmentPushUtils implements Serializable {
Arrays.toString(segmentUris.subList(0, Math.min(5,
segmentUris.size())).toArray()),
Arrays.toString(spec.getPinotClusterSpecs()));
for (String segmentUri : segmentUris) {
+ URI segmentURI = URI.create(segmentUri);
+ PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
URI controllerURI;
try {
@@ -183,6 +192,10 @@ public class SegmentPushUtils implements Serializable {
tableName, segmentUri, controllerURI, e);
throw e;
}
+ } finally {
+ if (spec.isCleanUpOutputDir()) {
+ outputDirFS.delete(segmentURI, true);
+ }
}
});
}
@@ -202,11 +215,11 @@ public class SegmentPushUtils implements Serializable {
* @param segmentUriToTarPathMap contains the map of segment DownloadURI to
segment tar file path
* @throws Exception
*/
- public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec,
PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec,
PinotFS fileSystem,
+ Map<String, String> segmentUriToTarPathMap)
throws Exception {
String tableName = spec.getTableSpec().getTableName();
- LOGGER.info("Start pushing segment metadata: {} to locations: {} for table
{}",
- segmentUriToTarPathMap,
+ LOGGER.info("Start pushing segment metadata: {} to locations: {} for table
{}", segmentUriToTarPathMap,
Arrays.toString(spec.getPinotClusterSpecs()), tableName);
for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
@@ -233,15 +246,17 @@ public class SegmentPushUtils implements Serializable {
}
RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
try {
- List<Header> headers = ImmutableList.of(
- new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
segmentUriPath),
- new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+ List<Header> headers = ImmutableList
+ .of(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
segmentUriPath),
+ new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
// Add table name as a request parameter
NameValuePair tableNameValuePair =
new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
tableName);
List<NameValuePair> parameters =
Arrays.asList(tableNameValuePair);
- SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
- segmentName, segmentMetadataFile, headers, parameters,
FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+ SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
+
.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
segmentName,
+ segmentMetadataFile, headers, parameters,
FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
LOGGER.info("Response for pushing table {} segment {} to
location {} - {}: {}", tableName, segmentName,
controllerURI, response.getStatusCode(),
response.getResponse());
return true;
@@ -268,12 +283,13 @@ public class SegmentPushUtils implements Serializable {
}
}
- public static Map<String, String> getSegmentUriToTarPathMap(URI
outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+ public static Map<String, String> getSegmentUriToTarPathMap(URI
outputDirURI, String uriPrefix, String uriSuffix,
+ String[] files) {
Map<String, String> segmentUriToTarPathMap = new HashMap<>();
for (String file : files) {
URI uri = URI.create(file);
if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
- URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI,
uri, uriPrefix,uriSuffix);
+ URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI,
uri, uriPrefix, uriSuffix);
segmentUriToTarPathMap.put(updatedURI.toString(), file);
}
}
@@ -293,7 +309,8 @@ public class SegmentPushUtils implements Serializable {
private static File generateSegmentMetadataFile(PinotFS fileSystem, URI
tarFileURI)
throws Exception {
String uuid = UUID.randomUUID().toString();
- File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid
+ TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tarFile =
+ new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentMetadataDir = new File(FileUtils.getTempDirectory(),
"segmentMetadataDir-" + uuid);
try {
fileSystem.copyToLocalFile(tarFileURI, tarFile);
@@ -312,7 +329,8 @@ public class SegmentPushUtils implements Serializable {
TarGzCompressionUtils.untarOneFile(tarFile,
V1Constants.SEGMENT_CREATION_META,
new File(segmentMetadataDir, V1Constants.SEGMENT_CREATION_META));
- File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(),
"segmentMetadata-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(),
+ "segmentMetadata-" + uuid +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
if (segmentMetadataTarFile.exists()) {
FileUtils.forceDelete(segmentMetadataTarFile);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index ab199bc..740beaf 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -111,6 +111,11 @@ public class SegmentGenerationJobSpec implements
Serializable {
*/
private PushJobSpec _pushJobSpec;
+ /**
+ * Should clean up output segment on job completion.
+ */
+ private boolean _cleanUpOutputDir;
+
public ExecutionFrameworkSpec getExecutionFrameworkSpec() {
return _executionFrameworkSpec;
}
@@ -247,6 +252,14 @@ public class SegmentGenerationJobSpec implements
Serializable {
public void setSegmentCreationJobParallelism(int
segmentCreationJobParallelism) {
_segmentCreationJobParallelism = segmentCreationJobParallelism;
}
+
+ public void setCleanUpOutputDir(boolean cleanUpOutputDir) {
+ _cleanUpOutputDir = cleanUpOutputDir;
+ }
+
+ public boolean isCleanUpOutputDir() {
+ return _cleanUpOutputDir;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]