Repository: kylin Updated Branches: refs/heads/master b8acf14f5 -> 76a53da74
KYLIN-2165 cleanup old codes Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/76a53da7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/76a53da7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/76a53da7 Branch: refs/heads/master Commit: 76a53da74536f4b3414bd7f5ef3950b9b489b3e8 Parents: b8acf14 Author: shaofengshi <[email protected]> Authored: Mon Nov 7 18:20:28 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Mon Nov 7 18:21:08 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/job/JoinedFlatTable.java | 10 +-- .../apache/kylin/job/JoinedFlatTableTest.java | 2 +- .../kylin/rest/controller/CubeController.java | 2 +- .../source/hive/CreateFlatHiveTableStep.java | 72 +------------------- .../apache/kylin/source/hive/HiveMRInput.java | 41 ++--------- .../kylin/source/hive/HiveCmdBuilderTest.java | 6 +- 6 files changed, 18 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 6c08bc9..b26f50d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -98,14 +98,14 @@ public class JoinedFlatTable { return ddl.toString(); } - public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, JobEngineConfig engineConfig, boolean redistribute) { + public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, JobEngineConfig engineConfig) { StringBuilder sql = new StringBuilder(); sql.append(generateHiveSetStatements(engineConfig)); - sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc, redistribute) + ";").append("\n"); + sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";").append("\n"); return sql.toString(); } - public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean redistribute) { + public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder sql = new StringBuilder(); sql.append("SELECT" + "\n"); for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { @@ -117,10 +117,6 @@ public class JoinedFlatTable { } appendJoinStatement(flatDesc, sql); appendWhereStatement(flatDesc, sql); - if (redistribute == true) { - TblColRef distCol = flatDesc.getDistributedBy(); - appendDistributeStatement(sql, distCol); - } return sql.toString(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java index 1fe47f8..0faf22a 100644 --- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java @@ -77,7 +77,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { @Test public void testGenerateInsertSql() throws IOException { - String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()), true); + String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv())); System.out.println(sqls); int length = sqls.length(); http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 79739c2..5ca7cb5 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -145,7 +145,7 @@ public class CubeController extends BasicController { public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); - String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc, false); + String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc); GeneralResponse repsonse = new GeneralResponse(); repsonse.setProperty("sql", sql); http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index bcb9a38..025fd94 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -18,18 +18,12 @@ package org.apache.kylin.source.hive; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.BufferedLogger; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -45,42 +39,9 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableStep.class); private final BufferedLogger stepLogger = new BufferedLogger(logger); - private long readRowCountFromFile() throws IOException { - Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); - - FileSystem fs = FileSystem.get(rowCountFile.toUri(), HadoopUtil.getCurrentConfiguration()); - InputStream in = fs.open(rowCountFile); - try { - String content = IOUtils.toString(in, Charset.defaultCharset()); - return Long.valueOf(content.trim()); // strip the '\n' character - - } finally { - IOUtils.closeQuietly(in); - } - } - - private int determineNumReducer(KylinConfig config, long rowCount) throws IOException { - int mapperInputRows = config.getHadoopJobMapperInputRows(); - - int numReducers = Math.round(rowCount / ((float) mapperInputRows)); - numReducers = Math.max(numReducers, config.getHadoopJobMinReducerNumber()); - numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber()); - - stepLogger.log("total input rows = " + rowCount); - stepLogger.log("expected input rows per mapper = " + mapperInputRows); - stepLogger.log("reducers for RedistributeFlatHiveTableStep = " + numReducers); - - return numReducers; - } - - private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException { + private void createFlatHiveTable(KylinConfig config) throws IOException { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement(getInitStatement()); - boolean useRedistribute = getUseRedistribute(); - if (useRedistribute == true) { - hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); - hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge - } hiveCmdBuilder.addStatement(getCreateTableStatement()); final String cmd = hiveCmdBuilder.toString(); @@ -104,21 +65,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = getCubeSpecificConfig(); try { - - boolean useRedistribute = getUseRedistribute(); - - int numReducers = 0; - if (useRedistribute == true) { - long rowCount = readRowCountFromFile(); - if (!config.isEmptySegmentAllowed() && rowCount == 0) { - stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); - return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); - } - - numReducers = determineNumReducer(config, rowCount); - } - - createFlatHiveTable(config, numReducers); + createFlatHiveTable(config); return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); } catch (Exception e) { @@ -135,14 +82,6 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { return getParam("HiveInit"); } - public void setUseRedistribute(boolean useRedistribute) { - setParam("useRedistribute", String.valueOf(useRedistribute)); - } - - public boolean getUseRedistribute() { - return Boolean.valueOf(getParam("useRedistribute")); - } - public void setCreateTableStatement(String sql) { setParam("HiveRedistributeData", sql); } @@ -151,11 +90,4 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { return getParam("HiveRedistributeData"); } - public void setRowCountOutputDir(String rowCountOutputDir) { - setParam("rowCountOutputDir", rowCountOutputDir); - } - - public String getRowCountOutputDir() { - return getParam("rowCountOutputDir"); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index e0853b0..67ceffc 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -128,22 +128,11 @@ public class HiveMRInput implements IMRInput { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final KylinConfig kylinConfig = CubeManager.getInstance(conf.getConfig()).getCube(cubeName).getConfig(); - String createFlatTableMethod = kylinConfig.getCreateFlatHiveTableMethod(); - if ("1".equals(createFlatTableMethod)) { - // create flat table first, then count and redistribute - jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, false, "")); - if (kylinConfig.isHiveRedistributeEnabled() == true) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); - } - } else if ("2".equals(createFlatTableMethod)) { - // count from source table first, and then redistribute, suitable for partitioned table - final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count"; - jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir)); - jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, true, rowCountOutputDir)); - } else { - throw new IllegalArgumentException("Unknown value for kylin.hive.create.flat.table.method: " + createFlatTableMethod); + // create flat table first, then count and redistribute + jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); + if (kylinConfig.isHiveRedistributeEnabled() == true) { + jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); } - AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId()); if (task != null) { jobFlow.addTask(task); @@ -166,22 +155,6 @@ public class HiveMRInput implements IMRInput { return step; } - public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) { - final ShellExecutable step = new ShellExecutable(); - - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); - appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder); - hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); - hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); - hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir)); - - step.setCmd(hiveCmdBuilder.build()); - step.setName(ExecutableConstants.STEP_NAME_COUNT_HIVE_TABLE); - - return step; - } - public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -223,7 +196,7 @@ public class HiveMRInput implements IMRInput { return step; } - public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, boolean redistribute, String rowCountOutputDir) { + public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { StringBuilder hiveInitBuf = new StringBuilder(); hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); @@ -231,12 +204,10 @@ public class HiveMRInput implements IMRInput { final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n"; final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); - String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf); CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); - step.setUseRedistribute(redistribute); step.setInitStatement(hiveInitBuf.toString()); - step.setRowCountOutputDir(rowCountOutputDir); step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql + insertDataHqls); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java index 6aba1ef..da0c082 100644 --- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java +++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java @@ -68,9 +68,13 @@ public class HiveCmdBuilderTest { hiveCmdBuilder.addStatement("SHOW\n TABLES;"); String cmd = hiveCmdBuilder.build(); - assertTrue(cmd.startsWith("beeline -u jdbc_url -f") && cmd.contains(";rm -f")); + assertTrue(cmd.startsWith("beeline -u jdbc_url -f")); String hqlFile = cmd.substring(cmd.lastIndexOf("-f ") + 3).trim(); + if (hqlFile.endsWith(";")) { + hqlFile = hqlFile.substring(0, hqlFile.length() - 1); + } + String hqlStatement = FileUtils.readFileToString(new File(hqlFile), Charset.defaultCharset()); assertEquals("USE default;" + lineSeparator + "DROP TABLE test;" + lineSeparator + "SHOW\n TABLES;" + lineSeparator, hqlStatement);
