This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 62d5b03 KYLIN-3839 Strorage clean up after refreshing and deleting
segment
62d5b03 is described below
commit 62d5b0336b65013522e258499f640416f7e5ca82
Author: chao long <[email protected]>
AuthorDate: Mon Mar 18 13:34:01 2019 +0800
KYLIN-3839 Strorage clean up after refreshing and deleting segment
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../src/main/resources/kylin-defaults.properties | 3 +
.../java/org/apache/kylin/cube/CubeInstance.java | 12 ++-
.../org/apache/kylin/rest/service/CubeService.java | 40 ++++++++-
.../kylin/storage/hbase/steps/HBaseJobSteps.java | 98 +++++++++++++---------
.../hbase/steps/HBaseSparkOutputTransition.java | 2 +-
.../kylin/storage/hbase/util/StorageCleanUtil.java | 93 ++++++++++++++++++++
7 files changed, 208 insertions(+), 44 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 82f79eb..c240e7e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1209,6 +1209,10 @@ public abstract class KylinConfigBase implements
Serializable {
return
Integer.parseInt(getOptional("kylin.storage.hbase.replication-scope", "0"));
}
+ public boolean cleanStorageAfterDelOperation() {
+ return
Boolean.parseBoolean(getOptional("kylin.storage.clean-after-delete-operation",
FALSE));
+ }
+
//
============================================================================
// ENGINE.MR
//
============================================================================
diff --git a/core-common/src/main/resources/kylin-defaults.properties
b/core-common/src/main/resources/kylin-defaults.properties
index c9b0d59..e0cb1f0 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -149,6 +149,9 @@ kylin.storage.partition.max-scan-bytes=3221225472
# You can set it to a smaller value. 0 means use default.
# kylin.storage.hbase.coprocessor-timeout-seconds=0
+# clean real storage after delete operation
+# if you want to delete the real storage like htable of deleting segment, you
can set it to true
+kylin.storage.clean-after-delete-operation=false
### JOB ###
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 4599cf6..ad99377 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -185,10 +185,18 @@ public class CubeInstance extends RootPersistentEntity
implements IRealization,
return segments.getMergingSegments(mergedSegment);
}
+ public CubeSegment getOriginalSegmentToRefresh(CubeSegment
refreshedSegment) {
+ return getOriginalSegment(refreshedSegment);
+ }
+
public CubeSegment getOriginalSegmentToOptimize(CubeSegment
optimizedSegment) {
+ return getOriginalSegment(optimizedSegment);
+ }
+
+ private CubeSegment getOriginalSegment(CubeSegment toSegment) {
for (CubeSegment segment : this.getSegments(SegmentStatusEnum.READY)) {
- if (!optimizedSegment.equals(segment) //
- &&
optimizedSegment.getSegRange().equals(segment.getSegRange())) {
+ if (!toSegment.equals(segment) //
+ && toSegment.getSegRange().equals(segment.getSegRange())) {
return segment;
}
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index a9fbb97..2a5ce26 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,9 +29,11 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -43,6 +45,7 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.common.PatternedLogger;
@@ -81,6 +84,8 @@ import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.response.MetricsResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.ValidateUtil;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.util.StorageCleanUtil;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -348,8 +353,12 @@ public class CubeService extends BasicService implements
InitializingBean {
}
}
+ List<CubeSegment> toRemoveSegs = cube.getSegments();
+
int cubeNum =
getCubeManager().getCubesByDesc(cube.getDescriptor().getName()).size();
getCubeManager().dropCube(cube.getName(), cubeNum == 1);//only delete
cube desc when no other cube is using it
+
+ cleanSegmentStorage(toRemoveSegs);
}
/**
@@ -380,7 +389,6 @@ public class CubeService extends BasicService implements
InitializingBean {
this.releaseAllSegments(cube);
return cube;
-
}
/**
@@ -550,7 +558,30 @@ public class CubeService extends BasicService implements
InitializingBean {
logger.warn(String.format(Locale.ROOT,
msg.getDELETE_SEGMENT_CAUSE_GAPS(), cube.getName(), segmentName));
}
- return
CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);
+ CubeInstance cubeInstance =
CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);
+
+ cleanSegmentStorage(Collections.singletonList(toDelete));
+
+ return cubeInstance;
+ }
+
+ // clean segment data in hbase and hdfs
+ private void cleanSegmentStorage(List<CubeSegment> toRemoveSegs) throws
IOException {
+ if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation())
{
+ return;
+ }
+
+ if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) {
+ List<String> toDropHTables =
Lists.newArrayListWithCapacity(toRemoveSegs.size());
+ List<String> toDelHDFSPaths =
Lists.newArrayListWithCapacity(toRemoveSegs.size());
+ for (CubeSegment seg : toRemoveSegs) {
+ toDropHTables.add(seg.getStorageLocationIdentifier());
+
toDelHDFSPaths.add(JobBuilderSupport.getJobWorkingDir(seg.getConfig().getHdfsWorkingDirectory(),
seg.getLastBuildJobID()));
+ }
+
+ StorageCleanUtil.dropHTables(new
HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables);
+ StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(),
toDelHDFSPaths);
+ }
}
public boolean isOrphonSegment(CubeInstance cube, String segId) {
@@ -586,7 +617,12 @@ public class CubeService extends BasicService implements
InitializingBean {
private void releaseAllSegments(CubeInstance cube) throws IOException {
releaseAllJobs(cube);
+ List<CubeSegment> toRemoveSegs = cube.getSegments();
+
+ // remove from metadata
getCubeManager().clearSegments(cube);
+
+ cleanSegmentStorage(toRemoveSegs);
}
public void updateOnNewSegmentReady(String cubeName) {
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 4d61d9b..205abe2 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.steps;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.kylin.common.util.StringUtil;
@@ -115,20 +116,6 @@ public abstract class HBaseJobSteps extends
JobBuilderSupport {
return bulkLoadStep;
}
- public MergeGCStep createMergeGCStep() {
- MergeGCStep result = new MergeGCStep();
- result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
- result.setOldHTables(getMergingHTables());
- return result;
- }
-
- public MergeGCStep createOptimizeGCStep() {
- MergeGCStep result = new MergeGCStep();
- result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
- result.setOldHTables(getOptimizeHTables());
- return result;
- }
-
public List<CubeSegment> getOptimizeSegments() {
CubeInstance cube = (CubeInstance) seg.getRealization();
List<CubeSegment> newSegments =
Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
@@ -153,19 +140,25 @@ public abstract class HBaseJobSteps extends
JobBuilderSupport {
public List<String> getMergingHTables() {
final List<CubeSegment> mergingSegments = ((CubeInstance)
seg.getRealization())
- .getMergingSegments((CubeSegment) seg);
+ .getMergingSegments(seg);
Preconditions.checkState(mergingSegments.size() > 1,
"there should be more than 2 segments to merge, target segment
" + seg);
- final List<String> mergingHTables = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingHTables.add(merging.getStorageLocationIdentifier());
- }
- return mergingHTables;
+ return getOldHTables(mergingSegments);
+ }
+
+ public List<String> getRefreshingHTables() {
+ final CubeSegment refreshingSegment = ((CubeInstance)
seg.getRealization()).getOriginalSegmentToRefresh(seg);
+ return getOldHTables(Collections.singletonList(refreshingSegment));
+ }
+
+ public List<String> getRefreshingHDFSPaths() {
+ final CubeSegment refreshingSegment = ((CubeInstance)
seg.getRealization()).getOriginalSegmentToRefresh(seg);
+ return getOldHDFSPaths(Collections.singletonList(refreshingSegment));
}
public List<String> getMergingHDFSPaths() {
final List<CubeSegment> mergingSegments = ((CubeInstance)
seg.getRealization())
- .getMergingSegments((CubeSegment) seg);
+ .getMergingSegments(seg);
Preconditions.checkState(mergingSegments.size() > 1,
"there should be more than 2 segments to merge, target segment
" + seg);
final List<String> mergingHDFSPaths = Lists.newArrayList();
@@ -203,10 +196,7 @@ public abstract class HBaseJobSteps extends
JobBuilderSupport {
List<String> toDeletePaths = new ArrayList<>();
toDeletePaths.add(getOptimizationRootPath(jobId));
- HDFSPathGarbageCollectionStep step = new
HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
+ HDFSPathGarbageCollectionStep step
=createHDFSPathGCStep(toDeletePaths, jobId);
jobFlow.addTask(step);
}
@@ -214,15 +204,13 @@ public abstract class HBaseJobSteps extends
JobBuilderSupport {
public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable
jobFlow) {
String jobId = jobFlow.getId();
- jobFlow.addTask(createOptimizeGCStep());
+ MergeGCStep hBaseGCStep = createHBaseGCStep(getOptimizeHTables());
+ jobFlow.addTask(hBaseGCStep);
List<String> toDeletePaths = new ArrayList<>();
toDeletePaths.addAll(getOptimizeHDFSPaths());
- HDFSPathGarbageCollectionStep step = new
HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
+ HDFSPathGarbageCollectionStep step =
createHDFSPathGCStep(toDeletePaths, jobId);
jobFlow.addTask(step);
}
@@ -230,16 +218,14 @@ public abstract class HBaseJobSteps extends
JobBuilderSupport {
public void addMergingGarbageCollectionSteps(DefaultChainedExecutable
jobFlow) {
String jobId = jobFlow.getId();
- jobFlow.addTask(createMergeGCStep());
+ MergeGCStep hBaseGCStep = createHBaseGCStep(getMergingHTables());
+ jobFlow.addTask(hBaseGCStep);
List<String> toDeletePaths = new ArrayList<>();
toDeletePaths.addAll(getMergingHDFSPaths());
toDeletePaths.add(getHFilePath(jobId));
- HDFSPathGarbageCollectionStep step = new
HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
+ HDFSPathGarbageCollectionStep step =
createHDFSPathGCStep(toDeletePaths, jobId);
jobFlow.addTask(step);
}
@@ -252,12 +238,46 @@ public abstract class HBaseJobSteps extends
JobBuilderSupport {
toDeletePaths.add(getHFilePath(jobId));
toDeletePaths.add(getShrunkenDictionaryPath(jobId));
- HDFSPathGarbageCollectionStep step = new
HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
+ CubeSegment oldSegment =
((CubeInstance)seg.getRealization()).getOriginalSegmentToRefresh(seg);
+
+ // refresh segment
+ if (oldSegment != null) {
+ // delete old hdfs job
+ toDeletePaths.addAll(getRefreshingHDFSPaths());
+
+ // drop old htables
+ MergeGCStep hBaseGCStep =
createHBaseGCStep(getRefreshingHTables());
+ jobFlow.addTask(hBaseGCStep);
+ }
+ HDFSPathGarbageCollectionStep step =
createHDFSPathGCStep(toDeletePaths, jobId);
jobFlow.addTask(step);
}
+ /**
+ * create 'HBase Garbage clean step' to drop HTables in HBase
+ * @param toDropHTables
+ * @return
+ */
+ public MergeGCStep createHBaseGCStep(List<String> toDropHTables) {
+ MergeGCStep hBaseGCStep = new MergeGCStep();
+
hBaseGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
+ hBaseGCStep.setOldHTables(toDropHTables);
+ return hBaseGCStep;
+ }
+
+ /**
+ * create 'HDFS Garbage clean step' to delete paths on HDFS
+ * @param toDeletePaths
+ * @param jobId
+ * @return
+ */
+ public HDFSPathGarbageCollectionStep createHDFSPathGCStep(List<String>
toDeletePaths, String jobId) {
+ HDFSPathGarbageCollectionStep hdfsGCStep = new
HDFSPathGarbageCollectionStep();
+
hdfsGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+ hdfsGCStep.setDeletePaths(toDeletePaths);
+ hdfsGCStep.setJobId(jobId);
+ return hdfsGCStep;
+ }
+
}
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
index e6c3ee8..43babfd 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -60,7 +60,7 @@ public class HBaseSparkOutputTransition implements
ISparkOutput {
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable
jobFlow) {
- // nothing to do
+ steps.addCubingGarbageCollectionSteps(jobFlow);
}
};
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
new file mode 100644
index 0000000..a1259b8
--- /dev/null
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class StorageCleanUtil {
+ private static final Logger logger =
LoggerFactory.getLogger(StorageCleanUtil.class);
+
+ /**
+ * this method will close hbaseAdmin after finishing the work.
+ */
+ public static void dropHTables(final HBaseAdmin hbaseAdmin, List<String>
hTables) {
+ runSingleThreadTaskQuietly(() -> {
+ try {
+ for (String htable : hTables) {
+ logger.info("Deleting HBase table {}", htable);
+
+ if (hbaseAdmin.tableExists(htable)) {
+ if (hbaseAdmin.isTableEnabled(htable)) {
+ hbaseAdmin.disableTable(htable);
+ }
+
+ hbaseAdmin.deleteTable(htable);
+ logger.info("Deleted HBase table {}", htable);
+ } else {
+ logger.info("HBase table {} does not exist.", htable);
+ }
+ }
+ } catch (Exception e) {
+ // storage cleanup job will delete it
+ logger.error("Deleting HBase table failed");
+ } finally {
+ IOUtils.closeQuietly(hbaseAdmin);
+ }
+ });
+ }
+
+ public static void deleteHDFSPath(final FileSystem fileSystem,
List<String> hdfsPaths) {
+ runSingleThreadTaskQuietly(() -> {
+ try {
+ for (String hdfsPath : hdfsPaths) {
+ logger.info("Deleting HDFS path {}", hdfsPath);
+ Path path = new Path(hdfsPath);
+
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, true);
+ logger.info("Deleted HDFS path {}", hdfsPath);
+ }
+ }
+ } catch (Exception e) {
+ // storage cleanup job will delete it
+ logger.error("Deleting HDFS path failed");
+ }
+ });
+ }
+
+ private static void runSingleThreadTaskQuietly(Runnable task) {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ try {
+ executorService.execute(task);
+ } catch (Exception e) {
+ logger.error("Failed to run task", e);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+}