KYLIN-2269 Reduce MR memory usage for global dict Signed-off-by: Hongbin Ma <mahong...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f50c0c87 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f50c0c87 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f50c0c87 Branch: refs/heads/master-hbase1.x Commit: f50c0c87373e4abacd7106d527df4b885f0a88ea Parents: 878107e Author: kangkaisen <kangkai...@live.com> Authored: Tue Dec 6 19:26:09 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Dec 19 17:52:41 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 ++++ .../org/apache/kylin/cube/model/CubeDesc.java | 22 ++++++++++++++++++++ .../cube/model/CubeJoinedFlatTableDesc.java | 15 ++++++++----- .../cube/model/CubeJoinedFlatTableEnrich.java | 7 ++++++- .../org/apache/kylin/job/JoinedFlatTable.java | 13 ++++++++++-- .../metadata/model/IJoinedFlatTableDesc.java | 11 ++++++---- 6 files changed, 60 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5153562..01d1d36 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -492,6 +492,10 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.source.hive.beeline-params", ""); } + public String getFlatHiveTableClusterByDictColumn() { + return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column"); + } + @Deprecated public String getCreateFlatHiveTableMethod() { return getOptional("kylin.source.hive.create-flat-table-method", "1"); http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index f6b68af..3b8d034 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -1090,6 +1090,28 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { return null; } + + /** Get a column which can be used to cluster the source table. + * To reduce memory footprint in base cuboid for global dict */ + // TODO handle more than one ultra high cardinality columns use global dict in one cube + TblColRef getClusteredByColumn() { + if (getDistributedByColumn() != null) { + return null; + } + + if (dictionaries == null) { + return null; + } + + String clusterByColumn = config.getFlatHiveTableClusterByDictColumn(); + for (DictionaryDesc dictDesc : dictionaries) { + if (dictDesc.getColumnRef().getName().equalsIgnoreCase(clusterByColumn)) { + return dictDesc.getColumnRef(); + } + } + + return null; + } public String getDictionaryBuilderClass(TblColRef col) { if (dictionaries == null) http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index f37f86e..94e1a7c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -49,11 +49,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { public CubeJoinedFlatTableDesc(CubeDesc cubeDesc) { this(cubeDesc, null); } - + public CubeJoinedFlatTableDesc(CubeSegment cubeSegment) { this(cubeSegment.getCubeDesc(), cubeSegment); } - + private CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment /* can be null */) { this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; @@ -68,7 +68,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { return "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getUuid().replaceAll("-", "_"); } } - + protected final void initAddColumn(TblColRef col) { if (columnIndexMap.containsKey(col)) return; @@ -77,10 +77,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { columnIndexMap.put(col, columnIndex); columnList.add(col); columnCount = columnIndexMap.size(); - + Preconditions.checkState(columnIndexMap.size() == columnList.size()); } - + // check what columns from hive tables are required, and index them protected void initParseCubeDesc() { for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived(false)) { @@ -165,4 +165,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { return cubeSegment; } + @Override + public TblColRef getClusterBy() { + return cubeDesc.getClusteredByColumn(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index 979af76..a1312b5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -42,7 +42,7 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc { // != works due to object cache if (cubeDesc.getModel() != flatDesc.getDataModel()) throw new IllegalArgumentException(); - + this.cubeDesc = cubeDesc; this.flatDesc = flatDesc; parseCubeDesc(); @@ -132,4 +132,9 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc { return flatDesc.getSegment(); } + @Override + public TblColRef getClusterBy() { + return flatDesc.getClusterBy(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 9fa0961..9ed563f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -171,6 +171,10 @@ public class JoinedFlatTable { } } + private static void appendClusterStatement(StringBuilder sql, TblColRef clusterCol) { + sql.append(" CLUSTER BY ").append(colName(clusterCol)).append(";\n"); + } + private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) { boolean hasCondition = false; StringBuilder whereBuilder = new StringBuilder(); @@ -219,8 +223,13 @@ public class JoinedFlatTable { StringBuilder sql = new StringBuilder(); sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName); - TblColRef distDcol = flatDesc.getDistributedBy(); - appendDistributeStatement(sql, distDcol); + TblColRef clusterCol = flatDesc.getClusterBy(); + if (clusterCol != null) { + appendClusterStatement(sql, clusterCol); + } else { + appendDistributeStatement(sql, flatDesc.getDistributedBy()); + } + return sql.toString(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index ffa2680..b545e50 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -27,16 +27,19 @@ public interface IJoinedFlatTableDesc { String getTableName(); DataModelDesc getDataModel(); - + List<TblColRef> getAllColumns(); - + int getColumnIndex(TblColRef colRef); long getSourceOffsetStart(); - + long getSourceOffsetEnd(); - + TblColRef getDistributedBy(); + TblColRef getClusterBy(); + ISegment getSegment(); + }