Repository: kylin
Updated Branches:
  refs/heads/KYLIN-1677 [created] 73894cdda


KYLIN-1677 Distribute source data by certain columns when creating flat table

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/73894cdd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/73894cdd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/73894cdd

Branch: refs/heads/KYLIN-1677
Commit: 73894cdda83de817526a7557237dec7b664e097a
Parents: 71cf7c8
Author: shaofengshi <shaofeng...@apache.org>
Authored: Tue May 17 18:29:59 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Tue May 17 18:30:18 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/model/CubeDesc.java   |  14 ++
 .../org/apache/kylin/job/JoinedFlatTable.java   |  26 ++-
 .../kylin/job/constant/ExecutableConstants.java |   2 +-
 .../apache/kylin/job/JoinedFlatTableTest.java   |   4 +-
 .../source/hive/CreateFlatHiveTableStep.java    | 115 ++++++++++++
 .../apache/kylin/source/hive/HiveMRInput.java   | 182 ++++---------------
 6 files changed, 187 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 7b6e4f7..0b06ccb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -35,6 +35,7 @@ import java.util.TreeSet;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -1028,6 +1029,19 @@ public class CubeDesc extends RootPersistentEntity {
         return result;
     }
 
+    /**
+     * Get a column which can be used in distributing the source table
+     * @return
+     */
+    public TblColRef getDistributedByColumn() {
+        Set<TblColRef> shardBy = getShardByColumns();
+        if (shardBy != null && shardBy.size() > 0) {
+            return shardBy.iterator().next();
+        }
+
+        return null;
+    }
+
     public static CubeDesc getCopyOf(CubeDesc cubeDesc) {
         CubeDesc newCubeDesc = new CubeDesc();
         newCubeDesc.setName(cubeDesc.getName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 6ae8110..d625ad7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -141,11 +141,17 @@ public class JoinedFlatTable {
         }
         appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
         appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+        appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap);
         return sql.toString();
     }
 
-    public static String generateSelectRowCountStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String outputDir) {
-        return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) 
FROM " + intermediateTableDesc.getTableName() + ";\n";
+    public static String generateCountDataStatement(IJoinedFlatTableDesc 
intermediateTableDesc, final String outputDir) {
+        final Map<String, String> tableAliasMap = 
buildTableAliasMap(intermediateTableDesc.getDataModel());
+        final StringBuilder sql = new StringBuilder();
+        final String factTbl = 
intermediateTableDesc.getDataModel().getFactTable();
+        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT 
count(*) from " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
+        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+        return sql.toString();
     }
 
     private static Map<String, String> buildTableAliasMap(DataModelDesc 
dataModelDesc) {
@@ -211,6 +217,22 @@ public class JoinedFlatTable {
         }
     }
 
+    private static void appendDistributeStatement(IJoinedFlatTableDesc 
intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
+            return;//TODO: for now only cube segments support distribution
+        }
+        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) 
intermediateTableDesc;
+
+        TblColRef distDcol = desc.getCubeDesc().getDistributedByColumn();
+
+        if (distDcol != null) {
+            String tblAlias = tableAliasMap.get(distDcol.getTable());
+            sql.append(" distribute by 
").append(tblAlias).append(".").append(distDcol.getName());
+        } else {
+            sql.append(" distribute by rand()");
+        }
+    }
+
     private static void appendWhereStatement(IJoinedFlatTableDesc 
intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
         if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
             return;//TODO: for now only cube segments support filter and 
partition

http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 50e8238..6084e7b 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -37,7 +37,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension 
Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create 
Intermediate Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = 
"Materialize Hive View in Lookup Tables";
-    public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = 
"Redistribute Intermediate Flat Hive Table";
+    public static final String STEP_NAME_COUNT_HIVE_TABLE = "Count Source 
Table";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact 
Table Distinct Columns";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base 
Cuboid Data";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";

http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java 
b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index d9425bd..697d392 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -71,7 +71,7 @@ public class JoinedFlatTableTest extends 
LocalFileMetadataTestCase {
     public void testGenDropTableDDL() {
         String ddl = 
JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
         System.out.println(ddl);
-        assertEquals(107, ddl.length());
+        assertEquals(101, ddl.length());
     }
 
     @Test
@@ -80,7 +80,7 @@ public class JoinedFlatTableTest extends 
LocalFileMetadataTestCase {
         System.out.println(sqls);
 
         int length = sqls.length();
-        assertEquals(1155, length);
+        assertEquals(1437, length);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
new file mode 100644
index 0000000..e9b9994
--- /dev/null
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -0,0 +1,115 @@
+package org.apache.kylin.source.hive;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ */
+public class CreateFlatHiveTableStep extends AbstractExecutable {
+    private final BufferedLogger stepLogger = new BufferedLogger(logger);
+
+    private long readRowCountFromFile(Path file) throws IOException {
+        FileSystem fs = FileSystem.get(file.toUri(), 
HadoopUtil.getCurrentConfiguration());
+        InputStream in = fs.open(file);
+        try {
+            String content = IOUtils.toString(in);
+            return Long.valueOf(content.trim()); // strip the '\n' character
+
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+    }
+
+    private int determineNumReducer(KylinConfig config) throws IOException {
+        Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+        long rowCount = readRowCountFromFile(rowCountFile);
+        int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+        int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+        numReducers = Math.max(1, numReducers);
+
+        stepLogger.log("total input rows = " + rowCount);
+        stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+        stepLogger.log("reducers for RedistributeFlatHiveTableStep = " + 
numReducers);
+
+        return numReducers;
+    }
+
+    private void createFlatHiveTable(KylinConfig config, int numReducers) 
throws IOException {
+        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        hiveCmdBuilder.addStatement(getInitStatement());
+        hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers 
+ ";\n");
+        hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); 
//disable merge
+        hiveCmdBuilder.addStatement(getCreateTableStatement());
+        final String cmd = hiveCmdBuilder.toString();
+
+        stepLogger.log("Create and distribute table, cmd: ");
+        stepLogger.log(cmd);
+
+        Pair<Integer, String> response = 
config.getCliCommandExecutor().execute(cmd, stepLogger);
+        if (response.getFirst() != 0) {
+            throw new RuntimeException("Failed to create flat hive table, 
error code " + response.getFirst());
+        }
+    }
+
+    private KylinConfig getCubeSpecificConfig() {
+        String cubeName = CubingExecutableUtil.getCubeName(getParams());
+        CubeManager manager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubeInstance cube = manager.getCube(cubeName);
+        return cube.getConfig();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        KylinConfig config = getCubeSpecificConfig();
+        try {
+
+            int numReducers = determineNumReducer(config);
+            createFlatHiveTable(config, numReducers);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with 
exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+        }
+    }
+
+    public void setInitStatement(String sql) {
+        setParam("HiveInit", sql);
+    }
+
+    public String getInitStatement() {
+        return getParam("HiveInit");
+    }
+
+    public void setCreateTableStatement(String sql) {
+        setParam("HiveRedistributeData", sql);
+    }
+
+    public String getCreateTableStatement() {
+        return getParam("HiveRedistributeData");
+    }
+
+    public void setRowCountOutputDir(String rowCountOutputDir) {
+        setParam("rowCountOutputDir", rowCountOutputDir);
+    }
+
+    public String getRowCountOutputDir() {
+        return getParam("rowCountOutputDir");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 248a57b..68b8325 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -19,11 +19,9 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,10 +29,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BufferedLogger;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
@@ -117,36 +111,36 @@ public class HiveMRInput implements IMRInput {
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
             final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
 
-            jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, 
jobFlow.getId()));
+            final String rowCountOutputDir = 
JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
+            try {
+                FileSystem fs = 
FileSystem.get(HadoopUtil.getCurrentConfiguration());
+                fs.mkdirs(new Path(rowCountOutputDir));
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to create HDFS dir " + 
rowCountOutputDir, e);
+            }
+            jobFlow.addTask(createCountHiveTableStep(conf, flatHiveTableDesc, 
jobFlow.getId(), rowCountOutputDir));
+            jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, 
jobFlow.getId(), cubeName, rowCountOutputDir));
+//            jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, 
jobFlow.getId()));
             AbstractExecutable task = 
createLookupHiveViewMaterializationStep(jobFlow.getId());
             if(task != null) {
                 jobFlow.addTask(task);
             }
-            jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, 
flatHiveTableDesc, jobFlow.getId(), cubeName));
         }
 
-        public static AbstractExecutable 
createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId) {
+        public static AbstractExecutable 
createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String rowCountOutputDir) {
+            final ShellExecutable step = new ShellExecutable();
 
-            final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
-            final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatTableDesc);
-            final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatTableDesc, 
JobBuilderSupport.getJobWorkingDir(conf, jobId));
-            String insertDataHqls;
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
             try {
-                insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+                
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
             } catch (IOException e) {
-                throw new RuntimeException("Failed to generate insert data SQL 
for intermediate table.", e);
+                throw new RuntimeException("Failed to generate hive set 
statements for createCountHiveTableStep", e);
             }
-
-            ShellExecutable step = new ShellExecutable();
-
-            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.addStatement(useDatabaseHql);
-            hiveCmdBuilder.addStatement(dropTableHql);
-            hiveCmdBuilder.addStatement(createTableHql);
-            hiveCmdBuilder.addStatement(insertDataHqls);
+            hiveCmdBuilder.addStatement("set 
hive.exec.compress.output=false;\n");
+            
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc,
 rowCountOutputDir));
 
             step.setCmd(hiveCmdBuilder.build());
-            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+            step.setName(ExecutableConstants.STEP_NAME_COUNT_HIVE_TABLE);
 
             return step;
         }
@@ -193,24 +187,30 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
-        public static AbstractExecutable 
createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String cubeName) {
+        public static AbstractExecutable 
createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String cubeName, String rowCountOutputDir) {
             StringBuilder hiveInitBuf = new StringBuilder();
-            hiveInitBuf.append("USE 
").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
             try {
                 
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
             } catch (IOException e) {
                 throw new RuntimeException("Failed to generate hive set 
statements for RedistributeFlatHiveTableStep", e);
             }
 
-            String rowCountOutputDir = 
JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+            final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
+            final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+            final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatTableDesc, 
JobBuilderSupport.getJobWorkingDir(conf, jobId));
+            String insertDataHqls;
+            try {
+                insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to generate insert data SQL 
for intermediate table.", e);
+            }
 
-            RedistributeFlatHiveTableStep step = new 
RedistributeFlatHiveTableStep();
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
             step.setInitStatement(hiveInitBuf.toString());
-            
step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc,
 rowCountOutputDir));
             step.setRowCountOutputDir(rowCountOutputDir);
-            
step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeDataStatement(flatTableDesc));
+            step.setCreateTableStatement(useDatabaseHql + dropTableHql + 
createTableHql + insertDataHqls);
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            
step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
             return step;
         }
 
@@ -234,126 +234,6 @@ public class HiveMRInput implements IMRInput {
         }
     }
 
-    public static class RedistributeFlatHiveTableStep extends 
AbstractExecutable {
-        private final BufferedLogger stepLogger = new BufferedLogger(logger);
-
-        private void computeRowCount(CliCommandExecutor cmdExecutor) throws 
IOException {
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.addStatement(getInitStatement());
-            hiveCmdBuilder.addStatement("set 
hive.exec.compress.output=false;\n");
-            hiveCmdBuilder.addStatement(getSelectRowCountStatement());
-            final String cmd = hiveCmdBuilder.build();
-
-            stepLogger.log("Compute row count of flat hive table, cmd: ");
-            stepLogger.log(cmd);
-
-            Pair<Integer, String> response = cmdExecutor.execute(cmd, 
stepLogger);
-            if (response.getFirst() != 0) {
-                throw new RuntimeException("Failed to compute row count of 
flat hive table");
-            }
-        }
-
-        private long readRowCountFromFile(Path file) throws IOException {
-            FileSystem fs = FileSystem.get(file.toUri(), 
HadoopUtil.getCurrentConfiguration());
-            InputStream in = fs.open(file);
-            try {
-                String content = IOUtils.toString(in);
-                return Long.valueOf(content.trim()); // strip the '\n' 
character
-
-            } finally {
-                IOUtils.closeQuietly(in);
-            }
-        }
-
-        private int determineNumReducer(KylinConfig config) throws IOException 
{
-            computeRowCount(config.getCliCommandExecutor());
-
-            Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
-            long rowCount = readRowCountFromFile(rowCountFile);
-            int mapperInputRows = config.getHadoopJobMapperInputRows();
-
-            int numReducers = Math.round(rowCount / ((float) mapperInputRows));
-            numReducers = Math.max(1, numReducers);
-
-            stepLogger.log("total input rows = " + rowCount);
-            stepLogger.log("expected input rows per mapper = " + 
mapperInputRows);
-            stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " 
+ numReducers);
-
-            return numReducers;
-        }
-
-        private void redistributeTable(KylinConfig config, int numReducers) 
throws IOException {
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.addStatement(getInitStatement());
-            hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + 
numReducers + ";\n");
-            hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
-            hiveCmdBuilder.addStatement(getRedistributeDataStatement());
-            final String cmd = hiveCmdBuilder.toString();
-
-            stepLogger.log("Redistribute table, cmd: ");
-            stepLogger.log(cmd);
-
-            Pair<Integer, String> response = 
config.getCliCommandExecutor().execute(cmd, stepLogger);
-            if (response.getFirst() != 0) {
-                throw new RuntimeException("Failed to redistribute flat hive 
table");
-            }
-        }
-
-        private KylinConfig getCubeSpecificConfig() {
-            String cubeName = CubingExecutableUtil.getCubeName(getParams());
-            CubeManager manager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cube = manager.getCube(cubeName);
-            return cube.getConfig();
-        }
-
-        @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
-            KylinConfig config = getCubeSpecificConfig();
-
-            try {
-                int numReducers = determineNumReducer(config);
-                redistributeTable(config, numReducers);
-                return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
-
-            } catch (Exception e) {
-                logger.error("job:" + getId() + " execute finished with 
exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
-            }
-        }
-
-        public void setInitStatement(String sql) {
-            setParam("HiveInit", sql);
-        }
-
-        public String getInitStatement() {
-            return getParam("HiveInit");
-        }
-
-        public void setSelectRowCountStatement(String sql) {
-            setParam("HiveSelectRowCount", sql);
-        }
-
-        public String getSelectRowCountStatement() {
-            return getParam("HiveSelectRowCount");
-        }
-
-        public void setRedistributeDataStatement(String sql) {
-            setParam("HiveRedistributeData", sql);
-        }
-
-        public String getRedistributeDataStatement() {
-            return getParam("HiveRedistributeData");
-        }
-
-        public void setRowCountOutputDir(String rowCountOutputDir) {
-            setParam("rowCountOutputDir", rowCountOutputDir);
-        }
-
-        public String getRowCountOutputDir() {
-            return getParam("rowCountOutputDir");
-        }
-    }
-
     public static class GarbageCollectionStep extends AbstractExecutable {
         @Override
         protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {

Reply via email to