This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new beabeaa KYLIN-4754 The Cleanup tool cannot clean the the parquet file
of the deleted cube and project
beabeaa is described below
commit beabeaa2ec9b81c06a60d5ca62e639de0b660358
Author: yaqian.zhang <[email protected]>
AuthorDate: Tue Sep 15 14:05:18 2020 +0800
KYLIN-4754 The Cleanup tool cannot clean the the parquet file of the
deleted cube and project
---
.../apache/kylin/rest/job/StorageCleanupJob.java | 52 +++++++++++++++++--
.../kylin/rest/job/StorageCleanupJobTest.java | 58 ++++++++++++++++++----
2 files changed, 95 insertions(+), 15 deletions(-)
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index c23ad9a..cd7f9c5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -98,8 +98,29 @@ public class StorageCleanupJob extends AbstractApplication {
}
}
- //clean up no used segments
+ //clean up deleted projects and cubes
List<CubeInstance> cubes = cubeManager.listAllCubes();
+ Path metadataPath = new Path(config.getHdfsWorkingDirectory());
+ if (fs.exists(metadataPath)) {
+ FileStatus[] projectStatus = fs.listStatus(metadataPath);
+ if (projectStatus != null) {
+ for (FileStatus status : projectStatus) {
+ String projectName = status.getPath().getName();
+ if (!projects.contains(projectName)) {
+ if (delete) {
+ logger.info("Deleting HDFS path " +
status.getPath());
+ fs.delete(status.getPath(), true);
+ } else {
+ logger.info("Dry run, pending delete HDFS path " +
status.getPath());
+ }
+ } else {
+ cleanupDeletedCubes(projectName,
cubes.stream().map(CubeInstance::getName).collect(Collectors.toList()));
+ }
+ }
+ }
+ }
+
+ //clean up no used segments
for (CubeInstance cube : cubes) {
List<String> segments = cube.getSegments().stream().map(segment ->
{
return segment.getName() + "_" +
segment.getStorageLocationIdentifier();
@@ -109,9 +130,9 @@ public class StorageCleanupJob extends AbstractApplication {
//list all segment directory
Path cubePath = new Path(config.getHdfsWorkingDirectory(project) +
"/parquet/" + cube.getName());
if (fs.exists(cubePath)) {
- FileStatus[] fStatus = fs.listStatus(cubePath);
- if (fStatus != null) {
- for (FileStatus status : fStatus) {
+ FileStatus[] segmentStatus = fs.listStatus(cubePath);
+ if (segmentStatus != null) {
+ for (FileStatus status : segmentStatus) {
String segment = status.getPath().getName();
if (!segments.contains(segment)) {
if (delete) {
@@ -128,4 +149,27 @@ public class StorageCleanupJob extends AbstractApplication
{
}
}
}
+
+ private void cleanupDeletedCubes(String project, List<String> cubes)
throws Exception {
+ //clean up deleted cubes
+ Path parquetPath = new Path(config.getHdfsWorkingDirectory(project) +
"/parquet");
+ if (fs.exists(parquetPath)) {
+ FileStatus[] cubeStatus = fs.listStatus(parquetPath);
+ if (cubeStatus != null) {
+ for (FileStatus status : cubeStatus) {
+ if (status.getPath() != null) {
+ String cubeName = status.getPath().getName();
+ if (!cubes.contains(cubeName)) {
+ if (delete) {
+ logger.info("Deleting HDFS path " +
status.getPath());
+ fs.delete(status.getPath(), true);
+ } else {
+ logger.info("Dry run, pending delete HDFS path
" + status.getPath());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
index eb5641d..4836fb3 100644
---
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
+++
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
@@ -67,11 +67,17 @@ public class StorageCleanupJobTest {
job.execute(new String[] { "--delete", "true" });
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
- verify(mockFs, times(3)).delete(pathCaptor.capture(), eq(true));
+ verify(mockFs, times(5)).delete(pathCaptor.capture(), eq(true));
ArrayList<Path> expected = Lists.newArrayList(
// Verify clean job temp directory
new Path(basePath + "/default/job_tmp"),
+ //Verify clean dropped cube
+ new Path(basePath + "/default/parquet/dropped_cube"),
+
+ //Verify clean deleted project
+ new Path(basePath + "/deleted_project"),
+
// Verify clean none used segments
new Path(basePath +
"/default/parquet/ci_left_join_cube/20120101000000_20130101000000_VRC"),
new Path(basePath +
"/default/parquet/ci_left_join_cube/20130101000000_20140101000000_PCN")
@@ -81,27 +87,57 @@ public class StorageCleanupJobTest {
private void prepareHDFSFiles(Path basePath, FileSystem mockFs) throws
IOException {
- FileStatus[] statuses = new FileStatus[2];
- FileStatus f1 = mock(FileStatus.class);
- FileStatus f2 = mock(FileStatus.class);
+ FileStatus[] segmentStatuses = new FileStatus[2];
+ FileStatus segment1 = mock(FileStatus.class);
+ FileStatus segment2 = mock(FileStatus.class);
- // Remove job temp directory
+ FileStatus[] cubeStatuses = new FileStatus[3];
+ FileStatus cube1 = mock(FileStatus.class);
+ FileStatus cube2 = mock(FileStatus.class);
+ FileStatus cube3 = mock(FileStatus.class);
+ FileStatus[] projectStatuses = new FileStatus[2];
+ FileStatus project1 = mock(FileStatus.class);
+ FileStatus project2 = mock(FileStatus.class);
+
+ // Remove job temp directory
Path jobTmpPath = new Path(basePath + "/default/job_tmp");
when(mockFs.exists(jobTmpPath)).thenReturn(true);
when(mockFs.delete(jobTmpPath, true)).thenReturn(true);
// remove every segment working dir from deletion list, so this
exclude.
- when(f1.getPath()).thenReturn(new Path(basePath +
"/default/parquet/ci_left_join_cube/20120101000000_20130101000000_VRC"));
- when(f2.getPath()).thenReturn(new Path(basePath +
"/default/parquet/ci_left_join_cube/20130101000000_20140101000000_PCN"));
- statuses[0] = f1;
- statuses[1] = f2;
+ when(segment1.getPath()).thenReturn(new Path(basePath +
"/default/parquet/ci_left_join_cube/20120101000000_20130101000000_VRC"));
+ when(segment2.getPath()).thenReturn(new Path(basePath +
"/default/parquet/ci_left_join_cube/20130101000000_20140101000000_PCN"));
+ segmentStatuses[0] = segment1;
+ segmentStatuses[1] = segment2;
Path cubePath1 = new Path(basePath +
"/default/parquet/ci_left_join_cube");
Path cubePath2 = new Path(basePath +
"/default/parquet/ci_inner_join_cube");
+ Path cubePath3 = new Path(basePath + "/default/parquet/dropped_cube");
when(mockFs.exists(cubePath1)).thenReturn(true);
when(mockFs.exists(cubePath2)).thenReturn(false);
-
- when(mockFs.listStatus(new Path(basePath +
"/default/parquet/ci_left_join_cube"))).thenReturn(statuses);
+ when(mockFs.exists(cubePath3)).thenReturn(true);
+
+ when(cube1.getPath()).thenReturn(cubePath1);
+ when(cube2.getPath()).thenReturn(cubePath2);
+ when(cube3.getPath()).thenReturn(cubePath3);
+ cubeStatuses[0] = cube1;
+ cubeStatuses[1] = cube2;
+ cubeStatuses[2] = cube3;
+
+ when(project1.getPath()).thenReturn(new Path(basePath + "/default"));
+ when(project2.getPath()).thenReturn(new Path(basePath +
"/deleted_project"));
+ projectStatuses[0] = project1;
+ projectStatuses[1] = project2;
+
+ Path defaultProjectParquetPath = new Path(basePath +
"/default/parquet");
+ Path deletedProjectParquetPath = new Path(basePath +
"/deleted_project/parquet");
+ when(mockFs.exists(defaultProjectParquetPath)).thenReturn(true);
+ when(mockFs.exists(deletedProjectParquetPath)).thenReturn(true);
+
+ when(mockFs.exists(basePath)).thenReturn(true);
+ when(mockFs.listStatus(new Path(basePath +
"/default/parquet/ci_left_join_cube"))).thenReturn(segmentStatuses);
+
when(mockFs.listStatus(defaultProjectParquetPath)).thenReturn(cubeStatuses);
+ when(mockFs.listStatus(basePath)).thenReturn(projectStatuses);
}
}