Repository: kylin Updated Branches: refs/heads/KYLIN-1677 [created] 73894cdda
KYLIN-1677 Distribute source data by certain columns when creating flat table Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/73894cdd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/73894cdd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/73894cdd Branch: refs/heads/KYLIN-1677 Commit: 73894cdda83de817526a7557237dec7b664e097a Parents: 71cf7c8 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue May 17 18:29:59 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue May 17 18:30:18 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/model/CubeDesc.java | 14 ++ .../org/apache/kylin/job/JoinedFlatTable.java | 26 ++- .../kylin/job/constant/ExecutableConstants.java | 2 +- .../apache/kylin/job/JoinedFlatTableTest.java | 4 +- .../source/hive/CreateFlatHiveTableStep.java | 115 ++++++++++++ .../apache/kylin/source/hive/HiveMRInput.java | 182 ++++--------------- 6 files changed, 187 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 7b6e4f7..0b06ccb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -35,6 +35,7 @@ import java.util.TreeSet; import javax.annotation.Nullable; +import com.google.common.collect.Sets; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; @@ -1028,6 +1029,19 @@ public class CubeDesc extends RootPersistentEntity { return result; } + /** + * Get a column which can be used in distributing the source table + * @return + */ + public TblColRef getDistributedByColumn() { + Set<TblColRef> shardBy = getShardByColumns(); + if (shardBy != null && shardBy.size() > 0) { + return shardBy.iterator().next(); + } + + return null; + } + public static CubeDesc getCopyOf(CubeDesc cubeDesc) { CubeDesc newCubeDesc = new CubeDesc(); newCubeDesc.setName(cubeDesc.getName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 6ae8110..d625ad7 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -141,11 +141,17 @@ public class JoinedFlatTable { } appendJoinStatement(intermediateTableDesc, sql, tableAliasMap); appendWhereStatement(intermediateTableDesc, sql, tableAliasMap); + appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap); return sql.toString(); } - public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) { - return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"; + public static String generateCountDataStatement(IJoinedFlatTableDesc intermediateTableDesc, final String outputDir) { + final Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel()); + final StringBuilder sql = new StringBuilder(); + final String factTbl = intermediateTableDesc.getDataModel().getFactTable(); + sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) from " + factTbl + " " + tableAliasMap.get(factTbl) + "\n"); + appendWhereStatement(intermediateTableDesc, sql, tableAliasMap); + return sql.toString(); } private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) { @@ -211,6 +217,22 @@ public class JoinedFlatTable { } } + private static void appendDistributeStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) { + if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) { + return;//TODO: for now only cube segments support distribution + } + CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc; + + TblColRef distDcol = desc.getCubeDesc().getDistributedByColumn(); + + if (distDcol != null) { + String tblAlias = tableAliasMap.get(distDcol.getTable()); + sql.append(" distribute by ").append(tblAlias).append(".").append(distDcol.getName()); + } else { + sql.append(" distribute by rand()"); + } + } + private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) { if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) { return;//TODO: for now only cube segments support filter and partition http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 50e8238..6084e7b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -37,7 +37,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; - public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Intermediate Flat Hive Table"; + public static final String STEP_NAME_COUNT_HIVE_TABLE = "Count Source Table"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube"; http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java index d9425bd..697d392 100644 --- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java @@ -71,7 +71,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { public void testGenDropTableDDL() { String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc); System.out.println(ddl); - assertEquals(107, ddl.length()); + assertEquals(101, ddl.length()); } @Test @@ -80,7 +80,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { System.out.println(sqls); int length = sqls.length(); - assertEquals(1155, length); + assertEquals(1437, length); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java new file mode 100644 index 0000000..e9b9994 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -0,0 +1,115 @@ +package org.apache.kylin.source.hive; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BufferedLogger; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +import java.io.IOException; +import java.io.InputStream; + +/** + */ +public class CreateFlatHiveTableStep extends AbstractExecutable { + private final BufferedLogger stepLogger = new BufferedLogger(logger); + + private long readRowCountFromFile(Path file) throws IOException { + FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); + InputStream in = fs.open(file); + try { + String content = IOUtils.toString(in); + return Long.valueOf(content.trim()); // strip the '\n' character + + } finally { + IOUtils.closeQuietly(in); + } + } + + private int determineNumReducer(KylinConfig config) throws IOException { + Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); + long rowCount = readRowCountFromFile(rowCountFile); + int mapperInputRows = config.getHadoopJobMapperInputRows(); + + int numReducers = Math.round(rowCount / ((float) mapperInputRows)); + numReducers = Math.max(1, numReducers); + + stepLogger.log("total input rows = " + rowCount); + stepLogger.log("expected input rows per mapper = " + mapperInputRows); + stepLogger.log("reducers for RedistributeFlatHiveTableStep = " + numReducers); + + return numReducers; + } + + private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(getInitStatement()); + hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); + hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge + hiveCmdBuilder.addStatement(getCreateTableStatement()); + final String cmd = hiveCmdBuilder.toString(); + + stepLogger.log("Create and distribute table, cmd: "); + stepLogger.log(cmd); + + Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); + } + } + + private KylinConfig getCubeSpecificConfig() { + String cubeName = CubingExecutableUtil.getCubeName(getParams()); + CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = manager.getCube(cubeName); + return cube.getConfig(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = getCubeSpecificConfig(); + try { + + int numReducers = determineNumReducer(config); + createFlatHiveTable(config, numReducers); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + + public void setInitStatement(String sql) { + setParam("HiveInit", sql); + } + + public String getInitStatement() { + return getParam("HiveInit"); + } + + public void setCreateTableStatement(String sql) { + setParam("HiveRedistributeData", sql); + } + + public String getCreateTableStatement() { + return getParam("HiveRedistributeData"); + } + + public void setRowCountOutputDir(String rowCountOutputDir) { + setParam("rowCountOutputDir", rowCountOutputDir); + } + + public String getRowCountOutputDir() { + return getParam("rowCountOutputDir"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 248a57b..68b8325 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -19,11 +19,9 @@ package org.apache.kylin.source.hive; import java.io.IOException; -import java.io.InputStream; import java.util.Set; import com.google.common.collect.Sets; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,10 +29,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.BufferedLogger; -import org.apache.kylin.common.util.CliCommandExecutor; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DimensionDesc; @@ -117,36 +111,36 @@ public class HiveMRInput implements IMRInput { public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId())); + final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count"; + try { + FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration()); + fs.mkdirs(new Path(rowCountOutputDir)); + } catch (IOException e) { + throw new RuntimeException("Failed to create HDFS dir " + rowCountOutputDir, e); + } + jobFlow.addTask(createCountHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), rowCountOutputDir)); + jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), cubeName, rowCountOutputDir)); +// jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId())); AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId()); if(task != null) { jobFlow.addTask(task); } - jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), cubeName)); } - public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) { + public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) { + final ShellExecutable step = new ShellExecutable(); - final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";"; - final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); - String insertDataHqls; + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); try { - insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf); + hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); } catch (IOException e) { - throw new RuntimeException("Failed to generate insert data SQL for intermediate table.", e); + throw new RuntimeException("Failed to generate hive set statements for createCountHiveTableStep", e); } - - ShellExecutable step = new ShellExecutable(); - - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(useDatabaseHql); - hiveCmdBuilder.addStatement(dropTableHql); - hiveCmdBuilder.addStatement(createTableHql); - hiveCmdBuilder.addStatement(insertDataHqls); + hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); + hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir)); step.setCmd(hiveCmdBuilder.build()); - step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + step.setName(ExecutableConstants.STEP_NAME_COUNT_HIVE_TABLE); return step; } @@ -193,24 +187,30 @@ public class HiveMRInput implements IMRInput { return step; } - public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { + public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, String rowCountOutputDir) { StringBuilder hiveInitBuf = new StringBuilder(); - hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n"); try { hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); } catch (IOException e) { throw new RuntimeException("Failed to generate hive set statements for RedistributeFlatHiveTableStep", e); } - String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; + final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";"; + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); + String insertDataHqls; + try { + insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf); + } catch (IOException e) { + throw new RuntimeException("Failed to generate insert data SQL for intermediate table.", e); + } - RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); + CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); step.setInitStatement(hiveInitBuf.toString()); - step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir)); step.setRowCountOutputDir(rowCountOutputDir); - step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeDataStatement(flatTableDesc)); + step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql + insertDataHqls); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); - step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); return step; } @@ -234,126 +234,6 @@ public class HiveMRInput implements IMRInput { } } - public static class RedistributeFlatHiveTableStep extends AbstractExecutable { - private final BufferedLogger stepLogger = new BufferedLogger(logger); - - private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(getInitStatement()); - hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); - hiveCmdBuilder.addStatement(getSelectRowCountStatement()); - final String cmd = hiveCmdBuilder.build(); - - stepLogger.log("Compute row count of flat hive table, cmd: "); - stepLogger.log(cmd); - - Pair<Integer, String> response = cmdExecutor.execute(cmd, stepLogger); - if (response.getFirst() != 0) { - throw new RuntimeException("Failed to compute row count of flat hive table"); - } - } - - private long readRowCountFromFile(Path file) throws IOException { - FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); - InputStream in = fs.open(file); - try { - String content = IOUtils.toString(in); - return Long.valueOf(content.trim()); // strip the '\n' character - - } finally { - IOUtils.closeQuietly(in); - } - } - - private int determineNumReducer(KylinConfig config) throws IOException { - computeRowCount(config.getCliCommandExecutor()); - - Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); - long rowCount = readRowCountFromFile(rowCountFile); - int mapperInputRows = config.getHadoopJobMapperInputRows(); - - int numReducers = Math.round(rowCount / ((float) mapperInputRows)); - numReducers = Math.max(1, numReducers); - - stepLogger.log("total input rows = " + rowCount); - stepLogger.log("expected input rows per mapper = " + mapperInputRows); - stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers); - - return numReducers; - } - - private void redistributeTable(KylinConfig config, int numReducers) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(getInitStatement()); - hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); - hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); - hiveCmdBuilder.addStatement(getRedistributeDataStatement()); - final String cmd = hiveCmdBuilder.toString(); - - stepLogger.log("Redistribute table, cmd: "); - stepLogger.log(cmd); - - Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); - if (response.getFirst() != 0) { - throw new RuntimeException("Failed to redistribute flat hive table"); - } - } - - private KylinConfig getCubeSpecificConfig() { - String cubeName = CubingExecutableUtil.getCubeName(getParams()); - CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubeInstance cube = manager.getCube(cubeName); - return cube.getConfig(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - KylinConfig config = getCubeSpecificConfig(); - - try { - int numReducers = determineNumReducer(config); - redistributeTable(config, numReducers); - return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); - - } catch (Exception e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); - } - } - - public void setInitStatement(String sql) { - setParam("HiveInit", sql); - } - - public String getInitStatement() { - return getParam("HiveInit"); - } - - public void setSelectRowCountStatement(String sql) { - setParam("HiveSelectRowCount", sql); - } - - public String getSelectRowCountStatement() { - return getParam("HiveSelectRowCount"); - } - - public void setRedistributeDataStatement(String sql) { - setParam("HiveRedistributeData", sql); - } - - public String getRedistributeDataStatement() { - return getParam("HiveRedistributeData"); - } - - public void setRowCountOutputDir(String rowCountOutputDir) { - setParam("rowCountOutputDir", rowCountOutputDir); - } - - public String getRowCountOutputDir() { - return getParam("rowCountOutputDir"); - } - } - public static class GarbageCollectionStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {