This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cfdcb9f1efecd39ad709db7257cac6bc90a98bbd
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Tue Aug 25 23:25:39 2020 +0800

    KYLIN-4699 Delete job_tmp path after build/merge successfully
---
 .../kylin/job/execution/AbstractExecutable.java    |  2 +
 .../engine/spark/metadata/cube/PathManager.java    | 44 ++++++++++++++++++++++
 .../kylin/engine/spark/job/NSparkCubingStep.java   |  9 +++++
 .../NSparkUpdateMetaAndCleanupAfterMergeStep.java  | 15 +++++---
 .../engine/spark/LocalWithSparkSessionTest.java    | 22 ++++++++++-
 .../org/apache/kylin/rest/service/CubeService.java | 11 ++----
 6 files changed, 88 insertions(+), 15 deletions(-)

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 908e154..33d42f3 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -198,6 +198,8 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
                 } catch (Throwable e) {
                     logger.error("error running Executable: {}", 
this.toString());
                     catchedException = e;
+                } finally {
+                    cleanup();
                 }
                 retry++;
                 realException = catchedException != null ? catchedException
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index 5353523..0484bfc 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -19,10 +19,15 @@
 package org.apache.kylin.engine.spark.metadata.cube;
 
 import java.io.File;
+import java.io.IOException;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,4 +48,43 @@ public final class PathManager {
         String hdfsWorkDir = 
cube.getConfig().getHdfsWorkingDirectory(cube.getProject());
         return hdfsWorkDir + "parquet" + File.separator + cube.getName() + 
File.separator + segName + "_" + identifier;
     }
+
+    /**
+     * Delete segment path
+     */
+    public static boolean deleteSegmentParquetStoragePath(CubeInstance cube, 
CubeSegment segment) throws IOException {
+        if (cube == null || segment == null) {
+            return false;
+        }
+        String path = getSegmentParquetStoragePath(cube, segment.getName(),
+                segment.getStorageLocationIdentifier());
+        logger.info("Deleting segment parquet path {}", path);
+        HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new 
Path(path));
+        return true;
+    }
+
+    /**
+     * Delete job temp path
+     */
+    public static boolean deleteJobTempPath(KylinConfig kylinConfig, String 
project, String jobId) {
+        if (StringUtils.isEmpty(jobId) || StringUtils.isEmpty(project)) {
+            return false;
+        }
+        Path jobTmpPath = new Path(kylinConfig.getJobTmpDir(project));
+        try {
+            Path[] toDeletedPath =
+                    
HadoopUtil.getFilteredPath(jobTmpPath.getFileSystem(HadoopUtil.getCurrentConfiguration()),
+                            jobTmpPath, jobId);
+            if (toDeletedPath != null && toDeletedPath.length > 0) {
+                for (Path deletedPath : toDeletedPath) {
+                    logger.info("Deleting job tmp path {}", 
deletedPath.toString());
+                    
HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), deletedPath);
+                }
+            }
+        } catch (IOException e) {
+            logger.error("Can not delete job tmp path: {}", jobTmpPath);
+            return false;
+        }
+        return true;
+    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index bdc68c6..a235290 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -24,11 +24,13 @@ import com.google.common.collect.Sets;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -119,4 +121,11 @@ public class NSparkCubingStep extends NSparkExecutable {
         joblogInfo.put(CubingJob.CUBE_SIZE_BYTES, 
String.valueOf(cube.getSizeKB()));
         return joblogInfo;
     }
+
+    @Override
+    public void cleanup() throws ExecuteException {
+        // delete job tmp dir
+        PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
+                getParam(MetadataConstants.P_JOB_ID));
+    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
index e0a6704..b3d6a0c 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
@@ -20,11 +20,8 @@ package org.apache.kylin.engine.spark.job;
 
 import java.io.IOException;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -54,11 +51,10 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep 
extends NSparkExecutable {
 
         CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid);
         Segments<CubeSegment> mergingSegments = 
cube.getMergingSegments(mergedSegment);
+        // delete segments which were merged
         for (CubeSegment segment : mergingSegments) {
-            String path = PathManager.getSegmentParquetStoragePath(cube, 
segment.getName(),
-                    segment.getStorageLocationIdentifier());
             try {
-                HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), 
new Path(path));
+                PathManager.deleteSegmentParquetStoragePath(cube, segment);
             } catch (IOException e) {
                 throw new ExecuteException("Can not delete segment: " + 
segment.getName() + ", in cube: " + cube.getName());
             }
@@ -76,4 +72,11 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep 
extends NSparkExecutable {
         AfterMergeOrRefreshResourceMerger merger = new 
AfterMergeOrRefreshResourceMerger(buildConfig);
         merger.merge(cubeId, mergedSegmentId, resourceStore, 
getParam(MetadataConstants.P_JOB_TYPE));
     }
+
+    @Override
+    public void cleanup() throws ExecuteException {
+        // delete job tmp dir
+        PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
+                getParam(MetadataConstants.P_JOB_ID));
+    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index 2480a6e..53d84e2 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -31,6 +31,7 @@ import org.apache.kylin.common.util.TempMetadataBuilder;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.spark.builder.CreateFlatTable;
 import org.apache.kylin.engine.spark.job.NSparkCubingJob;
 import org.apache.kylin.engine.spark.job.NSparkCubingStep;
@@ -45,6 +46,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelManager;
@@ -184,7 +186,11 @@ public class LocalWithSparkSessionTest extends 
LocalFileMetadataTestCase impleme
         // launch the job
         execMgr.addJob(job);
 
-        return wait(job);
+        ExecutableState result = wait(job);
+
+        checkJobTmpPathDeleted(config, job);
+
+        return result;
     }
 
     protected ExecutableState mergeSegments(String cubeName, long start, long 
end, boolean force) throws Exception {
@@ -202,6 +208,7 @@ public class LocalWithSparkSessionTest extends 
LocalFileMetadataTestCase impleme
                     segment.getStorageLocationIdentifier());
             Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new 
Path(HadoopUtil.makeURI(path))));
         }
+        checkJobTmpPathDeleted(config, mergeJob);
         return result;
     }
 
@@ -306,4 +313,17 @@ public class LocalWithSparkSessionTest extends 
LocalFileMetadataTestCase impleme
     public String getProject() {
         return "default";
     }
+
+    protected void checkJobTmpPathDeleted(KylinConfig config, CubingJob job) {
+        String project = job.getParam(MetadataConstants.P_PROJECT_NAME);
+        String jobId = job.getParam(MetadataConstants.P_JOB_ID);
+        Path jobTmpPath = new Path(config.getJobTmpDir(project));
+        try {
+            Path[] jobTmpPathArray =
+                    
HadoopUtil.getFilteredPath(jobTmpPath.getFileSystem(HadoopUtil.getCurrentConfiguration()),
+                            jobTmpPath, jobId);
+            Assert.assertTrue(jobTmpPathArray.length == 0);
+        } catch (IOException e) {
+        }
+    }
 }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 2aac66e..b50bdc0 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,12 +29,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -630,18 +628,15 @@ public class CubeService extends BasicService implements 
InitializingBean {
         return cubeInstance;
     }
 
-    // clean segment data in hbase and hdfs
+    // clean segment data in hdfs
     private void cleanSegmentStorage(CubeInstance cube, List<CubeSegment> 
toRemoveSegs) throws IOException {
         if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation()) 
{
             return;
         }
 
         if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) {
-            for (CubeSegment seg : toRemoveSegs) {
-                String path = PathManager.getSegmentParquetStoragePath(cube, 
seg.getName(),
-                        seg.getStorageLocationIdentifier());
-                logger.info("Deleting segment HDFS path {}", path);
-                HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), 
new Path(path));
+            for (CubeSegment segment : toRemoveSegs) {
+                PathManager.deleteSegmentParquetStoragePath(cube, segment);
             }
         }
     }

Reply via email to