This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 3fe29d0 KYLIN-3471 Minor, remove duplicate code 3fe29d0 is described below commit 3fe29d081fffd5b700a3dec2345462cd4e753037 Author: chao long <wayn...@qq.com> AuthorDate: Wed Aug 8 15:09:23 2018 +0800 KYLIN-3471 Minor, remove duplicate code --- .../kylin/engine/mr/common/AbstractHadoopJob.java | 29 +----------------- .../kylin/engine/mr/common/JobRelatedMetaUtil.java | 29 ++++++++++++++++++ .../apache/kylin/engine/spark/SparkExecutable.java | 34 ++-------------------- 3 files changed, 32 insertions(+), 60 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 180b56a..329dd56 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -27,7 +27,6 @@ import static org.apache.hadoop.util.StringUtils.formatTime; import static org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; @@ -63,7 +62,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.StorageURL; -import org.apache.kylin.common.persistence.ResourceTool; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; @@ -561,32 +559,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { dumpList.addAll(segment.getDictionaryPaths()); dumpList.add(segment.getStatisticsResourcePath()); } - dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), metaUrl); - } - - private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl) - throws IOException { - File tmp = File.createTempFile("kylin_job_meta", ""); - FileUtils.forceDelete(tmp); // we need a directory, so delete the file first - - File metaDir = new File(tmp, "meta"); - metaDir.mkdirs(); - - // dump metadata - JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList); - - // write kylin.properties - Properties props = kylinConfig.exportToProperties(); - props.setProperty("kylin.metadata.url", metadataUrl); - - File kylinPropsFile = new File(metaDir, "kylin.properties"); - try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) { - props.store(os, kylinPropsFile.getAbsolutePath()); - } - - KylinConfig dstConfig = KylinConfig.createKylinConfig(props); - //upload metadata - ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig); + JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), metaUrl); } protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java index 2cd1841..64469a0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java @@ -18,9 +18,12 @@ package org.apache.kylin.engine.mr.common; +import org.apache.commons.io.FileUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.ResourceTool; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; @@ -29,8 +32,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.LinkedHashSet; +import java.util.Properties; import java.util.Set; public class JobRelatedMetaUtil { @@ -69,4 +74,28 @@ public class JobRelatedMetaUtil { logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime); } + + public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl) + throws IOException { + File tmp = File.createTempFile("kylin_job_meta", ""); + FileUtils.forceDelete(tmp); // we need a directory, so delete the file first + + File metaDir = new File(tmp, "meta"); + metaDir.mkdirs(); + + // dump metadata + dumpResources(kylinConfig, metaDir, dumpList); + + // write kylin.properties + Properties props = kylinConfig.exportToProperties(); + props.setProperty("kylin.metadata.url", metadataUrl); + File kylinPropsFile = new File(metaDir, "kylin.properties"); + try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) { + props.store(os, kylinPropsFile.getAbsolutePath()); + } + + KylinConfig dstConfig = KylinConfig.createKylinConfig(props); + //upload metadata + ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig); + } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index d8eba71..637382c 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -17,28 +17,23 @@ */ package org.apache.kylin.engine.spark; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.util.Shell; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.ResourceTool; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; @@ -381,7 +376,7 @@ public class SparkExecutable extends AbstractExecutable { dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance())); dumpList.addAll(segment.getDictionaryPaths()); dumpList.add(segment.getStatisticsResourcePath()); - dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig()); + JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(), this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt())); } private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException { @@ -395,32 +390,7 @@ public class SparkExecutable extends AbstractExecutable { dumpList.add(segment.getStatisticsResourcePath()); } } - dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig()); - } - - private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig) - throws IOException { - File tmp = File.createTempFile("kylin_job_meta", ""); - FileUtils.forceDelete(tmp); // we need a directory, so delete the file first - - File metaDir = new File(tmp, "meta"); - metaDir.mkdirs(); - - // dump metadata - JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList); - - // write kylin.properties - Properties props = kylinConfig.exportToProperties(); - String metadataUrl = this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()); - props.setProperty("kylin.metadata.url", metadataUrl); - File kylinPropsFile = new File(metaDir, "kylin.properties"); - try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) { - props.store(os, kylinPropsFile.getAbsolutePath()); - } - - KylinConfig dstConfig = KylinConfig.createKylinConfig(props); - //upload metadata - ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig); + JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt())); } private void readCounters(final Map<String, String> info) {