Merge commit '727920b4d3642aaa3657d90b7f3dce7dcdd68fe2'

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b50030da
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b50030da
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b50030da

Branch: refs/heads/master
Commit: b50030da4cd64d424a82e9d2b752c00917e8e681
Parents: 068baf6 727920b
Author: Hongbin Ma <mahong...@apache.org>
Authored: Thu Aug 31 15:42:16 2017 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Thu Aug 31 21:33:14 2017 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   2 +-
 atopcalcite/pom.xml                             |   2 +-
 core-common/pom.xml                             |   2 +-
 .../org/apache/kylin/common/KylinConfig.java    |  57 ++-
 .../apache/kylin/common/KylinConfigBase.java    |   1 +
 .../org/apache/kylin/common/KylinConfigExt.java |   2 +-
 .../org/apache/kylin/common/KylinVersion.java   |   4 +-
 .../org/apache/kylin/common/util/ClassUtil.java |  13 +-
 core-cube/pom.xml                               |   2 +-
 .../kylin/cube/gridtable/CubeCodeSystem.java    |   4 +
 .../org/apache/kylin/gridtable/GTRecord.java    |   8 +
 .../kylin/gridtable/GTSampleCodeSystem.java     |   4 +
 .../apache/kylin/gridtable/IGTCodeSystem.java   |   3 +
 core-dictionary/pom.xml                         |   2 +-
 core-job/pom.xml                                |   2 +-
 .../job/execution/DefaultChainedExecutable.java |  13 +-
 .../kylin/job/execution/ExecutableManager.java  |  12 +-
 core-metadata/pom.xml                           |   2 +-
 .../measure/bitmap/BitmapCounterFactory.java    |   2 +
 .../kylin/measure/bitmap/BitmapSerializer.java  |  38 +-
 .../measure/bitmap/RoaringBitmapCounter.java    |  10 +
 .../bitmap/RoaringBitmapCounterFactory.java     |   5 +
 .../measure/percentile/PercentileCounter.java   |  28 +-
 .../metadata/datatype/DataTypeSerializer.java   |  13 +
 .../percentile/PercentileCounterTest.java       |  46 ---
 core-storage/pom.xml                            |   2 +-
 .../gtrecord/GTCubeStorageQueryBase.java        |  53 ++-
 .../gtrecord/SequentialCubeTupleIterator.java   |  17 +-
 engine-mr/pom.xml                               |   2 +-
 .../engine/mr/common/AbstractHadoopJob.java     | 128 +++---
 .../kylin/engine/mr/common/BatchConstants.java  |   1 +
 .../kylin/engine/mr/common/HadoopCmdOutput.java |  19 +-
 .../engine/mr/common/JobInfoConverter.java      |   7 +-
 .../engine/mr/common/JobRelatedMetaUtil.java    |  72 ++++
 engine-spark/pom.xml                            |   2 +-
 .../engine/spark/KylinKryoRegistrator.java      |  11 +-
 .../spark/SparkBatchCubingJobBuilder2.java      |   5 +
 .../kylin/engine/spark/SparkCubingByLayer.java  | 392 +++++++++++--------
 .../kylin/engine/spark/SparkExecutable.java     |  58 ++-
 .../spark/util/PercentileCounterSerializer.java |  55 +++
 .../spark/util/PercentileSerializerTest.java    |  67 ++++
 .../acl/0928468a-9fab-4185-9a14-6f2e7c74823f    |  27 ++
 .../acl/2fbca32a-a33e-4b69-83dd-0bb8b1f8c53b    |  27 ++
 .../acl/2fbca32a-a33e-4b69-83dd-0bb8b1f8c91b    |  24 ++
 jdbc/pom.xml                                    |   2 +-
 kylin-it/pom.xml                                |  18 +-
 .../job/ITDistributedSchedulerBaseTest.java     |   2 +
 .../jdbc/ITJdbcSourceTableLoaderTest.java       | 114 ++++++
 .../source/jdbc/ITJdbcTableReaderTest.java      | 109 ++++++
 .../test/resources/query/sql_limit/query05.sql  |  21 +
 pom.xml                                         |   2 +-
 query/pom.xml                                   |   2 +-
 .../query/adhoc/PushDownRunnerJdbcImpl.java     |   2 -
 .../kylin/query/relnode/OLAPAggregateRel.java   |   4 +-
 .../kylin/query/relnode/OLAPWindowRel.java      |   2 -
 .../kylin/query/routing/RealizationChooser.java |   7 +-
 server-base/pom.xml                             |   2 +-
 .../kylin/rest/controller/CubeController.java   |  49 ++-
 .../rest/controller2/CubeControllerV2.java      |  19 +-
 .../apache/kylin/rest/service/AclService.java   |   4 +-
 .../rest/service/AclTableMigrationTool.java     |  13 +-
 .../apache/kylin/rest/service/JobService.java   |   4 +
 .../apache/kylin/rest/service/QueryService.java |   4 +-
 .../kylin/rest/util/Log4jConfigListener.java    |   1 +
 server/pom.xml                                  |   2 +-
 server/src/main/resources/kylinSecurity.xml     |   2 +
 .../kylin/rest/service/JobServiceTest.java      |  33 ++
 source-hive/pom.xml                             |   7 +-
 .../HiveColumnCardinalityUpdateJob.java         |   3 +-
 .../apache/kylin/source/jdbc/JdbcExplorer.java  |   6 +-
 source-kafka/pom.xml                            |   2 +-
 .../kylin/source/kafka/DateTimeParser.java      |  62 +++
 storage-hbase/pom.xml                           |   2 +-
 .../kylin/storage/hbase/HBaseConnection.java    |   2 +-
 .../kylin/storage/hbase/HBaseResourceStore.java |   7 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   1 +
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |   2 +-
 .../hbase/cube/v2/HBaseReadonlyStore.java       |  33 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  20 +-
 .../endpoint/generated/CubeVisitProtos.java     | 151 +++++--
 .../endpoint/protobuf/CubeVisit.proto           |   1 +
 .../storage/hbase/steps/CreateHTableJob.java    |   3 +-
 .../steps/HDFSPathGarbageCollectionStep.java    |   2 +-
 .../hbase/util/DeployCoprocessorCLI.java        |   5 +-
 tomcat-ext/pom.xml                              |   2 +-
 tool-assembly/pom.xml                           |   2 +-
 tool/pom.xml                                    |   2 +-
 webapp/app/css/AdminLTE.css                     |   6 +-
 webapp/app/js/controllers/access.js             |   1 -
 webapp/app/js/controllers/cubeAdvanceSetting.js |   8 +-
 webapp/app/js/controllers/cubeEdit.js           |  52 +++
 webapp/app/js/controllers/cubeMeasures.js       |  68 +++-
 .../js/controllers/modelConditionsSettings.js   |   9 +
 webapp/app/js/controllers/modelDataModel.js     |   3 +-
 webapp/app/js/controllers/sourceMeta.js         |   2 -
 webapp/app/js/model/cubeDescModel.js            |   6 -
 webapp/app/js/model/cubeListModel.js            |  10 +-
 webapp/app/js/model/jobListModel.js             |   4 -
 webapp/app/js/model/streamingListModel.js       |   6 -
 webapp/app/less/layout.less                     |   2 +-
 webapp/app/partials/admin/admin.html            |  16 +-
 .../cubeDesigner/advanced_settings.html         |  55 ++-
 webapp/app/partials/cubeDesigner/info.html      |   2 +-
 webapp/app/partials/cubeDesigner/measures.html  |  22 +-
 webapp/app/partials/cubes/cube_detail.html      |   4 +-
 webapp/app/partials/cubes/cube_json_edit.html   |   2 +-
 .../modelDesigner/conditions_settings.html      |   1 +
 .../app/partials/modelDesigner/data_model.html  |   4 +-
 .../app/partials/modelDesigner/model_info.html  |   2 +-
 .../app/partials/projects/project_create.html   |   2 +-
 .../app/partials/projects/project_detail.html   |   2 +-
 111 files changed, 1692 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/assembly/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --cc core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cc08056,1d5e0ec..dfb9a28
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@@ -59,27 -60,27 +60,47 @@@ public class KylinConfig extends KylinC
  
      // thread-local instances, will override SYS_ENV_INSTANCE
      private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = 
new ThreadLocal<>();
 +    
 +    static {
 +        /*
 +         * Make Calcite to work with Unicode.
 +         * 
 +         * Sets default char set for string literals in SQL and row types of
 +         * RelNode. This is more a label used to compare row type equality. 
For
 +         * both SQL string and row record, they are passed to Calcite in 
String
 +         * object and does not require additional codec.
 +         * 
 +         * Ref SaffronProperties.defaultCharset
 +         * Ref SqlUtil.translateCharacterSetName() 
 +         * Ref NlsString constructor()
 +         */
 +        // copied from 
org.apache.calcite.util.ConversionUtil.NATIVE_UTF16_CHARSET_NAME
 +        String NATIVE_UTF16_CHARSET_NAME = (ByteOrder.nativeOrder() == 
ByteOrder.BIG_ENDIAN) ? "UTF-16BE" : "UTF-16LE";
 +        System.setProperty("saffron.default.charset", 
NATIVE_UTF16_CHARSET_NAME);
 +        System.setProperty("saffron.default.nationalcharset", 
NATIVE_UTF16_CHARSET_NAME);
 +        System.setProperty("saffron.default.collation.name", 
NATIVE_UTF16_CHARSET_NAME + "$en_US");
 +    }
  
+     static {
+         /*
+          * Make Calcite to work with Unicode.
+          * 
+          * Sets default char set for string literals in SQL and row types of
+          * RelNode. This is more a label used to compare row type equality. 
For
+          * both SQL string and row record, they are passed to Calcite in 
String
+          * object and does not require additional codec.
+          * 
+          * Ref SaffronProperties.defaultCharset
+          * Ref SqlUtil.translateCharacterSetName() 
+          * Ref NlsString constructor()
+          */
+         // copied from 
org.apache.calcite.util.ConversionUtil.NATIVE_UTF16_CHARSET_NAME
+         String NATIVE_UTF16_CHARSET_NAME = (ByteOrder.nativeOrder() == 
ByteOrder.BIG_ENDIAN) ? "UTF-16BE" : "UTF-16LE";
+         System.setProperty("saffron.default.charset", 
NATIVE_UTF16_CHARSET_NAME);
+         System.setProperty("saffron.default.nationalcharset", 
NATIVE_UTF16_CHARSET_NAME);
+         System.setProperty("saffron.default.collation.name", 
NATIVE_UTF16_CHARSET_NAME + "$en_US");
+     }
+ 
      public static KylinConfig getInstanceFromEnv() {
          synchronized (KylinConfig.class) {
              KylinConfig config = THREAD_ENV_INSTANCE.get();

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
----------------------------------------------------------------------
diff --cc core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
index c6913fc,ad23421..e52feab
--- a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
@@@ -95,7 -89,7 +95,7 @@@ public class KylinVersion implements Co
      /**
       * Require MANUAL updating kylin version per ANY upgrading.
       */
-     private static final KylinVersion CURRENT_KYLIN_VERSION = new 
KylinVersion("2.1.0.20403");
 -    private static final KylinVersion CURRENT_KYLIN_VERSION = new 
KylinVersion("2.2.0");
++    private static final KylinVersion CURRENT_KYLIN_VERSION = new 
KylinVersion("2.2.0.20500");
  
      private static final KylinVersion VERSION_200 = new KylinVersion("2.0.0");
  
@@@ -211,4 -199,4 +211,4 @@@
          }
      }
  
--}
++}

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --cc core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index c3a160a,f65e4b5..00e355f
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@@ -295,20 -296,12 +295,28 @@@ public class GTRecord implements Compar
          }
      }
  
 +    public void loadColumnsFromColumnBlocks(ImmutableBitSet[] 
selectedColumnBlocks, ImmutableBitSet selectedCols,
 +            ByteBuffer buf) {
 +        int pos = buf.position();
 +        for (ImmutableBitSet selectedColBlock : selectedColumnBlocks) {
 +            for (int i = 0; i < selectedColBlock.trueBitCount(); i++) {
 +                int c = selectedColBlock.trueBitAt(i);
 +                int len = info.codeSystem.codeLength(c, buf);
 +                if (selectedCols.get(c)) {
 +                    cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
 +                }
 +                pos += len;
 +                buf.position(pos);
 +            }
 +        }
 +    }
 +
+     /** change pointers to point to data in given buffer, this
+      *  method allows to defined specific column to load */
+     public void loadColumns(int selectedCol, ByteBuffer buf) {
+         int pos = buf.position();
+         int len = info.codeSystem.codeLength(selectedCol, buf);
+         cols[selectedCol].set(buf.array(), buf.arrayOffset() + pos, len);
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --cc 
core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index b40aa5d,7b7b9ca..a3af511
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@@ -95,11 -95,10 +96,11 @@@ public abstract class GTCubeStorageQuer
              return ITupleIterator.EMPTY_TUPLE_ITERATOR;
  
          return new SequentialCubeTupleIterator(scanners, request.getCuboid(), 
request.getDimensions(),
-                 request.getMetrics(), returnTupleInfo, request.getContext(), 
sqlDigest);
+                 request.getGroups(), request.getMetrics(), returnTupleInfo, 
request.getContext(), sqlDigest);
      }
  
 -    protected GTCubeStorageQueryRequest getStorageQueryRequest(StorageContext 
context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
 +    protected GTCubeStorageQueryRequest getStorageQueryRequest(StorageContext 
context, SQLDigest sqlDigest,
 +            TupleInfo returnTupleInfo) {
          context.setStorageQuery(this);
  
          //cope with queries with no aggregations

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index a378a1d,fc8fb4e..54f77c0
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@@ -68,9 -66,6 +66,7 @@@ import org.apache.kylin.cube.CubeSegmen
  import org.apache.kylin.job.JobInstance;
  import org.apache.kylin.job.exception.JobException;
  import org.apache.kylin.metadata.model.TableDesc;
- import org.apache.kylin.metadata.model.TableRef;
 +import org.apache.kylin.metadata.project.ProjectManager;
- import org.apache.kylin.source.SourceFactory;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -78,23 -73,22 +74,43 @@@
  public abstract class AbstractHadoopJob extends Configured implements Tool {
      private static final Logger logger = 
LoggerFactory.getLogger(AbstractHadoopJob.class);
  
-     protected static final Option OPTION_PROJECT = 
OptionBuilder.withArgName(BatchConstants.ARG_PROJECT).hasArg().isRequired(true).withDescription("Project
 name.").create(BatchConstants.ARG_PROJECT);
--    protected static final Option OPTION_JOB_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job
 name. For example, 
Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
--    protected static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube
 name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
--    protected static final Option OPTION_CUBING_JOB_ID = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID
 of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
++    protected static final Option OPTION_PROJECT = 
OptionBuilder.withArgName(BatchConstants.ARG_PROJECT).hasArg()
++            .isRequired(true).withDescription("Project 
name.").create(BatchConstants.ARG_PROJECT);
++    protected static final Option OPTION_JOB_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg()
++            .isRequired(true).withDescription("Job name. For example, 
Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)")
++            .create(BatchConstants.ARG_JOB_NAME);
++    protected static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
++            .isRequired(true).withDescription("Cube name. For exmaple, 
flat_item_cube")
++            .create(BatchConstants.ARG_CUBE_NAME);
++    protected static final Option OPTION_CUBING_JOB_ID = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID)
++            .hasArg().isRequired(false).withDescription("ID of cubing job 
executable")
++            .create(BatchConstants.ARG_CUBING_JOB_ID);
      //    @Deprecated
--    protected static final Option OPTION_SEGMENT_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube
 segment name").create(BatchConstants.ARG_SEGMENT_NAME);
--    protected static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube
 segment id").create(BatchConstants.ARG_SEGMENT_ID);
--    protected static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input
 path").create(BatchConstants.ARG_INPUT);
--    protected static final Option OPTION_INPUT_FORMAT = 
OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input
 format").create(BatchConstants.ARG_INPUT_FORMAT);
--    protected static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output
 path").create(BatchConstants.ARG_OUTPUT);
--    protected static final Option OPTION_NCUBOID_LEVEL = 
OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg().isRequired(true).withDescription("N-Cuboid
 build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL);
--    protected static final Option OPTION_PARTITION_FILE_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_PARTITION).hasArg().isRequired(true).withDescription("Partition
 file path.").create(BatchConstants.ARG_PARTITION);
--    protected static final Option OPTION_HTABLE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME).hasArg().isRequired(true).withDescription("HTable
 name").create(BatchConstants.ARG_HTABLE_NAME);
--
--    protected static final Option OPTION_STATISTICS_ENABLED = 
OptionBuilder.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false).withDescription("Statistics
 enabled").create(BatchConstants.ARG_STATS_ENABLED);
--    protected static final Option OPTION_STATISTICS_OUTPUT = 
OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT).hasArg().isRequired(false).withDescription("Statistics
 output").create(BatchConstants.ARG_STATS_OUTPUT);
--    protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = 
OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false).withDescription("Statistics
 sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
++    protected static final Option OPTION_SEGMENT_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME)
++            .hasArg().isRequired(true).withDescription("Cube segment 
name").create(BatchConstants.ARG_SEGMENT_NAME);
++    protected static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg()
++            .isRequired(true).withDescription("Cube segment 
id").create(BatchConstants.ARG_SEGMENT_ID);
++    protected static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
++            .isRequired(true).withDescription("Input 
path").create(BatchConstants.ARG_INPUT);
++    protected static final Option OPTION_INPUT_FORMAT = 
OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT)
++            .hasArg().isRequired(false).withDescription("Input 
format").create(BatchConstants.ARG_INPUT_FORMAT);
++    protected static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
++            .isRequired(true).withDescription("Output 
path").create(BatchConstants.ARG_OUTPUT);
++    protected static final Option OPTION_NCUBOID_LEVEL = 
OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg()
++            .isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 
2, 3...").create(BatchConstants.ARG_LEVEL);
++    protected static final Option OPTION_PARTITION_FILE_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
++            .hasArg().isRequired(true).withDescription("Partition file 
path.").create(BatchConstants.ARG_PARTITION);
++    protected static final Option OPTION_HTABLE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME)
++            .hasArg().isRequired(true).withDescription("HTable 
name").create(BatchConstants.ARG_HTABLE_NAME);
++
++    protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder
++            
.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false)
++            .withDescription("Statistics 
enabled").create(BatchConstants.ARG_STATS_ENABLED);
++    protected static final Option OPTION_STATISTICS_OUTPUT = 
OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT)
++            .hasArg().isRequired(false).withDescription("Statistics 
output").create(BatchConstants.ARG_STATS_OUTPUT);
++    protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = 
OptionBuilder
++            
.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false)
++            .withDescription("Statistics sampling 
percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
  
      private static final String MAP_REDUCE_CLASSPATH = 
"mapreduce.application.classpath";
  
@@@ -152,7 -146,7 +168,9 @@@
          } else {
              job.waitForCompletion(true);
              retVal = job.isSuccessful() ? 0 : 1;
--            logger.debug("Job '" + job.getJobName() + "' finished " + 
(job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + 
formatTime((System.nanoTime() - start) / 1000000L));
++            logger.debug("Job '" + job.getJobName() + "' finished "
++                    + (job.isSuccessful() ? "successfully in " : "with 
failures.  Time taken ")
++                    + formatTime((System.nanoTime() - start) / 1000000L));
          }
          return retVal;
      }
@@@ -173,7 -167,7 +191,8 @@@
          Configuration jobConf = job.getConfiguration();
          String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
          if (classpath == null || classpath.length() == 0) {
--            logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job 
configuration, will run 'mapred classpath' to get the default value.");
++            logger.info("Didn't find " + MAP_REDUCE_CLASSPATH
++                    + " in job configuration, will run 'mapred classpath' to 
get the default value.");
              classpath = getDefaultMapRedClasspath();
              logger.info("The default mapred classpath is: " + classpath);
          }
@@@ -206,11 -200,11 +225,13 @@@
                  StringUtil.appendWithSeparator(kylinDependency, 
hiveExecJarPath);
                  logger.debug("hive-exec jar file: " + hiveExecJarPath);
  
--                String hiveHCatJarPath = 
ClassUtil.findContainingJar(Class.forName("org.apache.hive.hcatalog.mapreduce.HCatInputFormat"));
++                String hiveHCatJarPath = ClassUtil
++                        
.findContainingJar(Class.forName("org.apache.hive.hcatalog.mapreduce.HCatInputFormat"));
                  StringUtil.appendWithSeparator(kylinDependency, 
hiveHCatJarPath);
                  logger.debug("hive-catalog jar file: " + hiveHCatJarPath);
  
--                String hiveMetaStoreJarPath = 
ClassUtil.findContainingJar(Class.forName("org.apache.hadoop.hive.metastore.api.Table"));
++                String hiveMetaStoreJarPath = ClassUtil
++                        
.findContainingJar(Class.forName("org.apache.hadoop.hive.metastore.api.Table"));
                  StringUtil.appendWithSeparator(kylinDependency, 
hiveMetaStoreJarPath);
                  logger.debug("hive-metastore jar file: " + 
hiveMetaStoreJarPath);
              } catch (ClassNotFoundException e) {
@@@ -226,7 -220,7 +247,8 @@@
          } else {
              logger.debug("No Kafka dependency jar set in the environment, 
will find them from classpath:");
              try {
--                String kafkaClientJarPath = 
ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
++                String kafkaClientJarPath = ClassUtil
++                        
.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
                  StringUtil.appendWithSeparator(kylinDependency, 
kafkaClientJarPath);
                  logger.debug("kafka jar file: " + kafkaClientJarPath);
  
@@@ -449,7 -443,7 +471,8 @@@
      }
  
      protected void attachCubeMetadata(CubeInstance cube, Configuration conf) 
throws IOException {
-         dumpKylinPropsAndMetadata(cube.getProject(), 
collectCubeMetadata(cube), cube.getConfig(), conf);
 -        
dumpKylinPropsAndMetadata(JobRelatedMetaUtil.collectCubeMetadata(cube), 
cube.getConfig(), conf);
++        dumpKylinPropsAndMetadata(cube.getProject(), 
JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(),
++                conf);
      }
  
      protected void attachCubeMetadataWithDict(CubeInstance cube, 
Configuration conf) throws IOException {
@@@ -463,28 -457,12 +486,13 @@@
  
      protected void attachSegmentMetadataWithDict(CubeSegment segment, 
Configuration conf) throws IOException {
          Set<String> dumpList = new LinkedHashSet<>();
-         dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
+         
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
          dumpList.addAll(segment.getDictionaryPaths());
 -        dumpKylinPropsAndMetadata(dumpList, segment.getConfig(), conf);
 +        dumpKylinPropsAndMetadata(segment.getProject(), dumpList, 
segment.getConfig(), conf);
      }
  
-     private Set<String> collectCubeMetadata(CubeInstance cube) {
-         // cube, model_desc, cube_desc, table
-         Set<String> dumpList = new LinkedHashSet<>();
-         dumpList.add(cube.getResourcePath());
-         dumpList.add(cube.getDescriptor().getModel().getResourcePath());
-         dumpList.add(cube.getDescriptor().getResourcePath());
- 
-         for (TableRef tableRef : 
cube.getDescriptor().getModel().getAllTables()) {
-             TableDesc table = tableRef.getTableDesc();
-             dumpList.add(table.getResourcePath());
-             dumpList.addAll(SourceFactory.getMRDependentResources(table));
-         }
-         
-         return dumpList;
-     }
- 
-     protected void dumpKylinPropsAndMetadata(String prj, Set<String> 
dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException {
 -    protected void dumpKylinPropsAndMetadata(Set<String> dumpList, 
KylinConfig kylinConfig, Configuration conf) throws IOException {
++    protected void dumpKylinPropsAndMetadata(String prj, Set<String> 
dumpList, KylinConfig kylinConfig,
++            Configuration conf) throws IOException {
          File tmp = File.createTempFile("kylin_job_meta", "");
          FileUtils.forceDelete(tmp); // we need a directory, so delete the 
file first
  
@@@ -494,13 -472,9 +502,13 @@@
          // write kylin.properties
          File kylinPropsFile = new File(metaDir, "kylin.properties");
          kylinConfig.writeProperties(kylinPropsFile);
-         
+ 
 +        if (prj != null) {
 +            
dumpList.add(ProjectManager.getInstance(kylinConfig).getProject(prj).getResourcePath());
 +        }
 +
          // write resources
-         dumpResources(kylinConfig, metaDir, dumpList);
+         JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
  
          // hadoop distributed cache
          String hdfsMetaDir = 
OptionsHelper.convertToFileURL(metaDir.getAbsolutePath());
@@@ -557,7 -514,7 +548,8 @@@
          HadoopUtil.deletePath(conf, path);
      }
  
--    public static double getTotalMapInputMB(Job job) throws 
ClassNotFoundException, IOException, InterruptedException, JobException {
++    public static double getTotalMapInputMB(Job job)
++            throws ClassNotFoundException, IOException, InterruptedException, 
JobException {
          if (job == null) {
              throw new JobException("Job is null");
          }
@@@ -574,11 -531,11 +566,13 @@@
          return totalMapInputMB;
      }
  
--    protected double getTotalMapInputMB() throws ClassNotFoundException, 
IOException, InterruptedException, JobException {
++    protected double getTotalMapInputMB()
++            throws ClassNotFoundException, IOException, InterruptedException, 
JobException {
          return getTotalMapInputMB(job);
      }
  
--    protected int getMapInputSplitCount() throws ClassNotFoundException, 
JobException, IOException, InterruptedException {
++    protected int getMapInputSplitCount()
++            throws ClassNotFoundException, JobException, IOException, 
InterruptedException {
          if (job == null) {
              throw new JobException("Job is null");
          }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
index 0000000,46b1d3c..c34245b
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@@ -1,0 -1,71 +1,72 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * 
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.engine.mr.common;
+ 
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.persistence.RawResource;
+ import org.apache.kylin.common.persistence.ResourceStore;
+ import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.metadata.model.TableDesc;
+ import org.apache.kylin.metadata.model.TableRef;
+ import org.apache.kylin.source.SourceFactory;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.LinkedHashSet;
+ import java.util.Set;
+ 
+ public class JobRelatedMetaUtil {
+     private static final Logger logger = 
LoggerFactory.getLogger(JobRelatedMetaUtil.class);
+ 
+     public static Set<String> collectCubeMetadata(CubeInstance cube) {
+         // cube, model_desc, cube_desc, table
+         Set<String> dumpList = new LinkedHashSet<>();
+         dumpList.add(cube.getResourcePath());
+         dumpList.add(cube.getDescriptor().getModel().getResourcePath());
+         dumpList.add(cube.getDescriptor().getResourcePath());
++        dumpList.add(cube.getProjectInstance().getResourcePath());
+ 
+         for (TableRef tableRef : 
cube.getDescriptor().getModel().getAllTables()) {
+             TableDesc table = tableRef.getTableDesc();
+             dumpList.add(table.getResourcePath());
+             dumpList.addAll(SourceFactory.getMRDependentResources(table));
+         }
+ 
+         return dumpList;
+     }
+ 
+     public static void dumpResources(KylinConfig kylinConfig, File metaDir, 
Set<String> dumpList) throws IOException {
+         long startTime = System.currentTimeMillis();
+ 
+         ResourceStore from = ResourceStore.getStore(kylinConfig);
+         KylinConfig localConfig = 
KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+         ResourceStore to = ResourceStore.getStore(localConfig);
+         for (String path : dumpList) {
+             RawResource res = from.getResource(path);
+             if (res == null)
+                 throw new IllegalStateException("No resource found at -- " + 
path);
+             to.putResource(path, res.inputStream, res.timestamp);
+             res.inputStream.close();
+         }
+ 
+         logger.debug("Dump resources to {} took {} ms", metaDir, 
System.currentTimeMillis() - startTime);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-spark/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --cc 
engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index ac56075,2991b82..e287739
mode 100644,100755..100755
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --cc 
engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 544e072,a03e238..dab5fb7
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@@ -66,8 -69,7 +70,7 @@@ import org.apache.spark.api.java.functi
  import org.apache.spark.api.java.function.Function2;
  import org.apache.spark.api.java.function.PairFlatMapFunction;
  import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.broadcast.Broadcast;
 -import org.apache.spark.sql.DataFrame;
 +import org.apache.spark.sql.Dataset;
  import org.apache.spark.sql.Row;
  import org.apache.spark.sql.hive.HiveContext;
  import org.apache.spark.storage.StorageLevel;
@@@ -186,62 -170,24 +171,24 @@@ public class SparkCubingByLayer extend
              allNormalMeasure = allNormalMeasure && needAggr[i];
          }
          logger.info("All measure are normal (agg on all cuboids) ? : " + 
allNormalMeasure);
- 
          StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
  
-         // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
-         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, 
Object[]>() {
-             volatile transient boolean initialized = false;
-             BaseCuboidBuilder baseCuboidBuilder = null;
- 
-             @Override
-             public Tuple2<ByteArray, Object[]> call(Row row) throws Exception 
{
-                 if (initialized == false) {
-                     synchronized (SparkCubingByLayer.class) {
-                         if (initialized == false) {
-                             prepare();
-                             long baseCuboidId = 
Cuboid.getBaseCuboidId(cubeDesc);
-                             Cuboid baseCuboid = Cuboid.findById(cubeDesc, 
baseCuboidId);
-                             baseCuboidBuilder = new 
BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, 
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), 
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
-                             initialized = true;
-                         }
-                     }
-                 }
- 
-                 String[] rowArray = rowToArray(row);
-                 baseCuboidBuilder.resetAggrs();
-                 byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
-                 Object[] result = 
baseCuboidBuilder.buildValueObjects(rowArray);
-                 return new Tuple2<>(new ByteArray(rowKey), result);
-             }
- 
-             private String[] rowToArray(Row row) {
-                 String[] result = new String[row.size()];
-                 for (int i = 0; i < row.size(); i++) {
-                     final Object o = row.get(i);
-                     if (o != null) {
-                         result[i] = o.toString();
-                     } else {
-                         result[i] = null;
-                     }
-                 }
-                 return result;
-             }
+         HiveContext sqlContext = new HiveContext(sc.sc());
 -        final DataFrame intermediateTable = sqlContext.table(hiveTable);
++        final Dataset intermediateTable = sqlContext.table(hiveTable);
  
-         });
+         // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
+         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD()
+                 .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, 
metaUrl));
  
-         logger.info("encodedBaseRDD partition number: " + 
encodedBaseRDD.getNumPartitions());
          Long totalCount = 0L;
-         if (kylinConfig.isSparkSanityCheckEnabled()) {
+         if (envConfig.isSparkSanityCheckEnabled()) {
              totalCount = encodedBaseRDD.count();
-             logger.info("encodedBaseRDD row count: " + 
encodedBaseRDD.count());
          }
  
-         final MeasureAggregators measureAggregators = new 
MeasureAggregators(cubeDesc.getMeasures());
-         final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), 
measureAggregators);
+         final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(cubeName, metaUrl);
          BaseCuboidReducerFunction2 reducerFunction2 = 
baseCuboidReducerFunction;
          if (allNormalMeasure == false) {
-             reducerFunction2 = new CuboidReducerFunction2(measureNum, 
vCubeDesc.getValue(), measureAggregators, needAggr);
+             reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, 
needAggr);
          }
  
          final int totalLevels = cubeDesc.getBuildLevel();
@@@ -336,26 -389,43 +390,43 @@@
  
      private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> 
EMTPY_ITERATOR = new ArrayList(0);
  
-     class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, 
Object[]>, ByteArray, Object[]> {
- 
-         CubeSegment cubeSegment;
-         CubeDesc cubeDesc;
-         NDCuboidBuilder ndCuboidBuilder;
-         RowKeySplitter rowKeySplitter;
-         transient boolean initialized = false;
+     static class CuboidFlatMap implements 
PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
+ 
+         private String cubeName;
+         private String segmentId;
+         private String metaUrl;
+         private CubeSegment cubeSegment;
+         private CubeDesc cubeDesc;
+         private CuboidScheduler cuboidScheduler;
+         private NDCuboidBuilder ndCuboidBuilder;
+         private RowKeySplitter rowKeySplitter;
+         private volatile transient boolean initialized = false;
+ 
+         public CuboidFlatMap(String cubeName, String segmentId, String 
metaUrl) {
+             this.cubeName = cubeName;
+             this.segmentId = segmentId;
+             this.metaUrl = metaUrl;
+         }
  
-         CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, 
NDCuboidBuilder ndCuboidBuilder) {
-             this.cubeSegment = cubeSegment;
-             this.cubeDesc = cubeDesc;
-             this.ndCuboidBuilder = ndCuboidBuilder;
+         public void init() {
+             KylinConfig kConfig = getKylinConfigForExecutor(metaUrl);
+             CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+             this.cubeDesc = cubeInstance.getDescriptor();
 -            this.cuboidScheduler = new CuboidScheduler(cubeDesc);
++            this.cuboidScheduler = cubeDesc.getCuboidScheduler();
+             this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new 
RowKeyEncoderProvider(cubeSegment));
              this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
          }
  
          @Override
 -        public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, 
Object[]> tuple2) throws Exception {
 +        public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, 
Object[]> tuple2) throws Exception {
              if (initialized == false) {
-                 prepare();
-                 initialized = true;
+                 synchronized (SparkCubingByLayer.class) {
+                     if (initialized == false) {
+                         init();
+                         initialized = true;
+                     }
+                 }
              }
  
              byte[] key = tuple2._1().array();

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
----------------------------------------------------------------------
diff --cc 
kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
index 1960e32,1960e32..483d8f7
--- 
a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
@@@ -52,6 -52,6 +52,8 @@@ public class ITDistributedSchedulerBase
          Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task2.getId()).getState());
          Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task3.getId()).getState());
          Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(job.getId()).getState());
++        
++        Thread.sleep(5000);
  
          Assert.assertEquals(null, getServerName(segmentId1));
      }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --cc 
kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
index 0000000,977c0f4..1bf3bfe
mode 000000,100644..100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java
@@@ -1,0 -1,112 +1,114 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * 
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.source.jdbc;
+ 
+ import static org.junit.Assert.assertTrue;
+ 
+ import java.sql.Connection;
+ import java.sql.DriverManager;
+ import java.sql.SQLException;
+ 
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+ import org.apache.kylin.common.util.Pair;
+ import org.apache.kylin.metadata.MetadataManager;
+ import org.apache.kylin.metadata.model.ISourceAware;
+ import org.apache.kylin.metadata.model.TableDesc;
+ import org.apache.kylin.metadata.model.TableExtDesc;
++import org.apache.kylin.metadata.project.ProjectInstance;
+ import org.apache.kylin.query.H2Database;
+ import org.apache.kylin.source.ISource;
+ import org.apache.kylin.source.ISourceMetadataExplorer;
+ import org.apache.kylin.source.SourceFactory;
+ import org.apache.kylin.source.datagen.ModelDataGenerator;
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase 
implements ISourceAware {
+ 
+     protected KylinConfig config = null;
+     protected static Connection h2Connection = null;
+ 
+     @Before
+     public void setup() throws Exception {
+ 
+         super.createTestMetadata();
+ 
+         System.setProperty("kylin.source.jdbc.connection-url", 
"jdbc:h2:mem:db" + "_jdbc_source_table_loader");
+         System.setProperty("kylin.source.jdbc.driver", "org.h2.Driver");
+         System.setProperty("kylin.source.jdbc.user", "sa");
+         System.setProperty("kylin.source.jdbc.pass", "");
+ 
+         config = KylinConfig.getInstanceFromEnv();
+ 
+         h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + 
"_jdbc_source_table_loader", "sa", "");
+ 
 -        H2Database h2DB = new H2Database(h2Connection, config);
++        String project = ProjectInstance.DEFAULT_PROJECT_NAME;
++        H2Database h2DB = new H2Database(h2Connection, config, project);
+ 
+         MetadataManager mgr = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+         ModelDataGenerator gen = new 
ModelDataGenerator(mgr.getDataModelDesc("ci_left_join_model"), 10000);
+         gen.generate();
+ 
+         h2DB.loadAllTables();
+ 
+     }
+ 
+     @After
+     public void after() throws Exception {
+ 
+         super.cleanupTestMetadata();
+ 
+         if (h2Connection != null) {
+             try {
+                 h2Connection.close();
+             } catch (SQLException e) {
+                 e.printStackTrace();
+             }
+         }
+ 
+         System.clearProperty("kylin.source.jdbc.connection-url");
+         System.clearProperty("kylin.source.jdbc.driver");
+         System.clearProperty("kylin.source.jdbc.user");
+         System.clearProperty("kylin.source.jdbc.pass");
+ 
+     }
+ 
+     @Test
+     public void test() throws Exception {
+ 
+         ISource source = SourceFactory.getSource(new 
ITJdbcSourceTableLoaderTest());
+         ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
+         Pair<TableDesc, TableExtDesc> pair;
+ 
 -        pair = explr.loadTableMetadata("DEFAULT", "TEST_KYLIN_FACT");
++        pair = explr.loadTableMetadata("DEFAULT", "TEST_KYLIN_FACT", 
ProjectInstance.DEFAULT_PROJECT_NAME);
+         
assertTrue(pair.getFirst().getIdentity().equals("DEFAULT.TEST_KYLIN_FACT"));
+ 
 -        pair = explr.loadTableMetadata("EDW", "TEST_CAL_DT");
++        pair = explr.loadTableMetadata("EDW", "TEST_CAL_DT", 
ProjectInstance.DEFAULT_PROJECT_NAME);
+         assertTrue(pair.getFirst().getIdentity().equals("EDW.TEST_CAL_DT"));
+ 
+     }
+ 
+     @Override
+     public int getSourceType() {
+         return ISourceAware.ID_JDBC;
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
----------------------------------------------------------------------
diff --cc 
kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
index 0000000,22e0b14..41a35fe
mode 000000,100644..100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java
@@@ -1,0 -1,107 +1,109 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * 
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  * 
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.source.jdbc;
+ 
+ import java.sql.Connection;
+ import java.sql.DriverManager;
+ import java.sql.SQLException;
+ 
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+ import org.apache.kylin.metadata.MetadataManager;
+ import org.apache.kylin.metadata.model.ISourceAware;
++import org.apache.kylin.metadata.project.ProjectInstance;
+ import org.apache.kylin.query.H2Database;
+ import org.apache.kylin.source.datagen.ModelDataGenerator;
+ import org.junit.After;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ public class ITJdbcTableReaderTest extends LocalFileMetadataTestCase 
implements ISourceAware {
+ 
+     protected KylinConfig config = null;
+     protected static Connection h2Connection = null;
+ 
+     @Before
+     public void setup() throws Exception {
+ 
+         super.createTestMetadata();
+ 
+         System.setProperty("kylin.source.jdbc.connection-url", 
"jdbc:h2:mem:db" + "_jdbc_table_reader");
+         System.setProperty("kylin.source.jdbc.driver", "org.h2.Driver");
+         System.setProperty("kylin.source.jdbc.user", "sa");
+         System.setProperty("kylin.source.jdbc.pass", "");
+ 
+         config = KylinConfig.getInstanceFromEnv();
+ 
+         h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + 
"_jdbc_table_reader", "sa", "");
+ 
 -        H2Database h2DB = new H2Database(h2Connection, config);
++        String project = ProjectInstance.DEFAULT_PROJECT_NAME;
++        H2Database h2DB = new H2Database(h2Connection, config, project);
+ 
+         MetadataManager mgr = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+         ModelDataGenerator gen = new 
ModelDataGenerator(mgr.getDataModelDesc("ci_left_join_model"), 10000);
+         gen.generate();
+ 
+         h2DB.loadAllTables();
+ 
+     }
+ 
+     @After
+     public void after() throws Exception {
+ 
+         super.cleanupTestMetadata();
+ 
+         if (h2Connection != null) {
+             try {
+                 h2Connection.close();
+             } catch (SQLException e) {
+                 e.printStackTrace();
+             }
+         }
+ 
+         System.clearProperty("kylin.source.jdbc.connection-url");
+         System.clearProperty("kylin.source.jdbc.driver");
+         System.clearProperty("kylin.source.jdbc.user");
+         System.clearProperty("kylin.source.jdbc.pass");
+ 
+     }
+ 
+     @Test
+     public void test() throws Exception {
+ 
+         JdbcTableReader reader = new JdbcTableReader("default", 
"test_kylin_fact");
+         int rowNumber = 0;
+         while (reader.next()) {
+             String[] row = reader.getRow();
+             Assert.assertEquals(11, row.length);
+ 
+             rowNumber++;
+         }
+ 
+         reader.close();
+         Assert.assertEquals(10000, rowNumber);
+ 
+     }
+ 
+     @Override
+     public int getSourceType() {
+         return ISourceAware.ID_JDBC;
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
----------------------------------------------------------------------
diff --cc 
query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
index cd0b2ca,3431c45..cfb0cbd
--- a/query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
@@@ -63,11 -63,12 +63,12 @@@ public class RealizationChooser 
          Map<DataModelDesc, Set<IRealization>> modelMap = 
makeOrderedModelMap(context);
  
          if (modelMap.size() == 0) {
 -            throw new NoRealizationFoundException("No model found for" + 
toErrorMsg(context));
 +            throw new NoRealizationFoundException("No model found for " + 
toErrorMsg(context));
          }
  
-         for (DataModelDesc model : modelMap.keySet()) {
-             Map<String, String> aliasMap = matches(model, context);
+         for (Map.Entry<DataModelDesc, Set<IRealization>> entry : 
modelMap.entrySet()) {
+             final DataModelDesc model = entry.getKey();
+             final Map<String, String> aliasMap = matches(model, context);
              if (aliasMap != null) {
                  fixModel(context, model, aliasMap);
  

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --cc 
server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a370292,a2cf0fb..4244cf3
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@@ -99,7 -100,7 +99,11 @@@ public class CubeController extends Bas
  
      @RequestMapping(value = "", method = { RequestMethod.GET }, produces = { 
"application/json" })
      @ResponseBody
--    public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", 
required = false) String cubeName, @RequestParam(value = "modelName", required 
= false) String modelName, @RequestParam(value = "projectName", required = 
false) String projectName, @RequestParam(value = "limit", required = false) 
Integer limit, @RequestParam(value = "offset", required = false) Integer 
offset) {
++    public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", 
required = false) String cubeName,
++            @RequestParam(value = "modelName", required = false) String 
modelName,
++            @RequestParam(value = "projectName", required = false) String 
projectName,
++            @RequestParam(value = "limit", required = false) Integer limit,
++            @RequestParam(value = "offset", required = false) Integer offset) 
{
          List<CubeInstance> cubes;
          cubes = cubeService.listAllCubes(cubeName, projectName, modelName, 
true);
  
@@@ -148,7 -149,7 +152,8 @@@
       * @throws UnknownHostException
       * @throws IOException
       */
--    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { 
RequestMethod.GET }, produces = { "application/json" })
++    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { 
RequestMethod.GET }, produces = {
++            "application/json" })
      @ResponseBody
      public GeneralResponse getSql(@PathVariable String cubeName, 
@PathVariable String segmentName) {
          CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@@ -168,7 -169,7 +173,8 @@@
       * @param notifyList
       * @throws IOException
       */
--    @RequestMapping(value = "/{cubeName}/notify_list", method = { 
RequestMethod.PUT }, produces = { "application/json" })
++    @RequestMapping(value = "/{cubeName}/notify_list", method = { 
RequestMethod.PUT }, produces = {
++            "application/json" })
      @ResponseBody
      public void updateNotifyList(@PathVariable String cubeName, @RequestBody 
List<String> notifyList) {
          CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@@ -208,9 -209,9 +214,11 @@@
       *
       * @throws IOException
       */
--    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", 
method = { RequestMethod.PUT }, produces = { "application/json" })
++    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", 
method = {
++            RequestMethod.PUT }, produces = { "application/json" })
      @ResponseBody
--    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, 
@PathVariable String segmentName, @RequestParam(value = "lookupTable") String 
lookupTable) {
++    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, 
@PathVariable String segmentName,
++            @RequestParam(value = "lookupTable") String lookupTable) {
          try {
              final CubeManager cubeMgr = cubeService.getCubeManager();
              final CubeInstance cube = cubeMgr.getCube(cubeName);
@@@ -226,7 -227,7 +234,8 @@@
       *
       * @throws IOException
       */
--    @RequestMapping(value = "/{cubeName}/segs/{segmentName}", method = { 
RequestMethod.DELETE }, produces = { "application/json" })
++    @RequestMapping(value = "/{cubeName}/segs/{segmentName}", method = { 
RequestMethod.DELETE }, produces = {
++            "application/json" })
      @ResponseBody
      public CubeInstance deleteSegment(@PathVariable String cubeName, 
@PathVariable String segmentName) {
          CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@@ -261,7 -262,7 +270,8 @@@
      @RequestMapping(value = "/{cubeName}/rebuild", method = { 
RequestMethod.PUT }, produces = { "application/json" })
      @ResponseBody
      public JobInstance rebuild(@PathVariable String cubeName, @RequestBody 
JobBuildRequest req) {
--        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 
0, 0, null, null, req.getBuildType(), req.isForce() || 
req.isForceMergeEmptySegment());
++        return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 
0, 0, null, null, req.getBuildType(),
++                req.isForce() || req.isForceMergeEmptySegment());
      }
  
      /** Build/Rebuild a cube segment by source offset */
@@@ -287,11 -288,11 +297,14 @@@
      @RequestMapping(value = "/{cubeName}/rebuild2", method = { 
RequestMethod.PUT }, produces = { "application/json" })
      @ResponseBody
      public JobInstance rebuild2(@PathVariable String cubeName, @RequestBody 
JobBuildRequest2 req) {
--        return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), 
req.getSourceOffsetEnd(), req.getSourcePartitionOffsetStart(), 
req.getSourcePartitionOffsetEnd(), req.getBuildType(), req.isForce());
++        return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), 
req.getSourceOffsetEnd(),
++                req.getSourcePartitionOffsetStart(), 
req.getSourcePartitionOffsetEnd(), req.getBuildType(),
++                req.isForce());
      }
  
      private JobInstance buildInternal(String cubeName, long startTime, long 
endTime, //
--            long startOffset, long endOffset, Map<Integer, Long> 
sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, String 
buildType, boolean force) {
++            long startOffset, long endOffset, Map<Integer, Long> 
sourcePartitionOffsetStart,
++            Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, 
boolean force) {
          try {
              String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
              CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
@@@ -300,7 -301,7 +313,8 @@@
                  throw new InternalErrorException("Cannot find cube " + 
cubeName);
              }
              return jobService.submitJob(cube, startTime, endTime, 
startOffset, endOffset, //
--                    sourcePartitionOffsetStart, sourcePartitionOffsetEnd, 
CubeBuildTypeEnum.valueOf(buildType), force, submitter);
++                    sourcePartitionOffsetStart, sourcePartitionOffsetEnd, 
CubeBuildTypeEnum.valueOf(buildType), force,
++                    submitter);
          } catch (Throwable e) {
              logger.error(e.getLocalizedMessage(), e);
              throw new InternalErrorException(e.getLocalizedMessage(), e);
@@@ -453,7 -454,7 +467,8 @@@
  
          try {
              desc.setUuid(UUID.randomUUID().toString());
--            String projectName = (null == cubeRequest.getProject()) ? 
ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject();
++            String projectName = (null == cubeRequest.getProject()) ? 
ProjectInstance.DEFAULT_PROJECT_NAME
++                    : cubeRequest.getProject();
              ProjectInstance project = 
cubeService.getProjectManager().getProject(projectName);
              if (project == null) {
                  throw new BadRequestException("Project " + projectName + " 
doesn't exist");
@@@ -485,7 -486,7 +500,8 @@@
              return cubeRequest;
          }
  
--        String projectName = (null == cubeRequest.getProject()) ? 
ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject();
++        String projectName = (null == cubeRequest.getProject()) ? 
ProjectInstance.DEFAULT_PROJECT_NAME
++                : cubeRequest.getProject();
          try {
              CubeInstance cube = 
cubeService.getCubeManager().getCube(cubeRequest.getCubeName());
  
@@@ -497,13 -498,13 +513,15 @@@
  
              //cube renaming is not allowed
              if 
(!cube.getDescriptor().getName().equalsIgnoreCase(desc.getName())) {
--                String error = "Cube Desc renaming is not allowed: 
desc.getName(): " + desc.getName() + ", cubeRequest.getCubeName(): " + 
cubeRequest.getCubeName();
++                String error = "Cube Desc renaming is not allowed: 
desc.getName(): " + desc.getName()
++                        + ", cubeRequest.getCubeName(): " + 
cubeRequest.getCubeName();
                  updateRequest(cubeRequest, false, error);
                  return cubeRequest;
              }
  
              if (cube.getSegments().size() != 0 && 
!cube.getDescriptor().consistentWith(desc)) {
--                String error = "CubeDesc " + desc.getName() + " is 
inconsistent with existing. Try purge that cube first or avoid updating key 
cube desc fields.";
++                String error = "CubeDesc " + desc.getName()
++                        + " is inconsistent with existing. Try purge that 
cube first or avoid updating key cube desc fields.";
                  updateRequest(cubeRequest, false, error);
                  return cubeRequest;
              }
@@@ -653,7 -654,7 +671,8 @@@
       * @param cubeName
       * @return
       */
--    @RequestMapping(value = "/{cubeName}/init_start_offsets", method = { 
RequestMethod.PUT }, produces = { "application/json" })
++    @RequestMapping(value = "/{cubeName}/init_start_offsets", method = { 
RequestMethod.PUT }, produces = {
++            "application/json" })
      @ResponseBody
      public GeneralResponse initStartOffsets(@PathVariable String cubeName) {
          checkCubeName(cubeName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
----------------------------------------------------------------------
diff --cc 
server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
index cdc09b9,aba2cf9..f8d5f83
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
@@@ -101,7 -102,7 +101,8 @@@ public class CubeControllerV2 extends B
              @RequestParam(value = "modelName", required = false) String 
modelName,
              @RequestParam(value = "projectName", required = false) String 
projectName,
              @RequestParam(value = "pageOffset", required = false, 
defaultValue = "0") Integer pageOffset,
--            @RequestParam(value = "pageSize", required = false, defaultValue 
= "10") Integer pageSize) throws IOException {
++            @RequestParam(value = "pageSize", required = false, defaultValue 
= "10") Integer pageSize)
++            throws IOException {
  
          HashMap<String, Object> data = new HashMap<String, Object>();
          List<CubeInstanceResponse> response = new 
ArrayList<CubeInstanceResponse>();
@@@ -109,13 -110,9 +110,13 @@@
  
          // official cubes
          for (CubeInstance cube : cubes) {
 -            response.add(createCubeInstanceResponse(cube));
 +            try {
 +                response.add(createCubeInstanceResponse(cube));
 +            } catch (Exception e) {
 +                logger.error("Error creating cube instance response, 
skipping.", e);
 +            }
          }
--        
++
          // draft cubes
          for (Draft d : cubeService.listCubeDrafts(cubeName, modelName, 
projectName, exactMatch)) {
              CubeDesc c = (CubeDesc) d.getEntity();
@@@ -144,7 -141,7 +145,7 @@@
  
          return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, data, "");
      }
--    
++
      private boolean contains(List<CubeInstanceResponse> response, String 
name) {
          for (CubeInstanceResponse r : response) {
              if (r.getName().equals(name))
@@@ -156,38 -153,33 +157,38 @@@
      private CubeInstanceResponse createCubeInstanceResponseFromDraft(Draft d) 
{
          CubeDesc desc = (CubeDesc) d.getEntity();
          Preconditions.checkState(desc.isDraft());
--        
++
          CubeInstance mock = new CubeInstance();
          mock.setName(desc.getName());
          mock.setDescName(desc.getName());
          mock.setStatus(RealizationStatusEnum.DISABLED);
--        
++
          CubeInstanceResponse r = new CubeInstanceResponse(mock);
--        
++
          r.setModel(desc.getModelName());
          r.setProject(d.getProject());
          r.setDraft(true);
--        
++
          return r;
      }
  
      private CubeInstanceResponse createCubeInstanceResponse(CubeInstance 
cube) {
          Preconditions.checkState(!cube.getDescriptor().isDraft());
--        
++
          CubeInstanceResponse r = new CubeInstanceResponse(cube);
  
          r.setModel(cube.getDescriptor().getModelName());
          r.setPartitionDateStart(cube.getDescriptor().getPartitionDateStart());
 -        
r.setPartitionDateColumn(cube.getModel().getPartitionDesc().getPartitionDateColumn());
 -        r.setIs_streaming(
 -                
cube.getModel().getRootFactTable().getTableDesc().getSourceType() == 
ISourceAware.ID_STREAMING);
 +        // cuz model doesn't have a state the label a model is broken,
 +        // so in some case the model can not be loaded due to some check 
failed,
 +        // but the cube in this model can still be loaded.
 +        if (cube.getModel() != null) {
 +            
r.setPartitionDateColumn(cube.getModel().getPartitionDesc().getPartitionDateColumn());
 +            r.setIs_streaming(
 +                    
cube.getModel().getRootFactTable().getTableDesc().getSourceType() == 
ISourceAware.ID_STREAMING);
 +        }
          r.setProject(projectService.getProjectOfCube(cube.getName()));
--        
++
          return r;
      }
  

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
----------------------------------------------------------------------
diff --cc 
server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
index fe91f52,e8b675e..f302307
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
@@@ -223,11 -223,13 +223,14 @@@ public class AclTableMigrationTool 
      private Map<String, AceInfo> getAllAceInfo(Result result) throws 
IOException {
          Map<String, AceInfo> allAceInfoMap = new HashMap<>();
          NavigableMap<byte[], byte[]> familyMap = 
result.getFamilyMap(Bytes.toBytes(AclConstant.ACL_ACES_FAMILY));
-         for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) {
-             String sid = new String(entry.getKey());
-             AceInfo aceInfo = aceSerializer.deserialize(entry.getValue());
-             if (null != aceInfo) {
-                 allAceInfoMap.put(sid, aceInfo);
 -        if(familyMap != null && !familyMap.isEmpty()) {
++
++        if (familyMap != null && !familyMap.isEmpty()) {
+             for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) {
+                 String sid = new String(entry.getKey());
+                 AceInfo aceInfo = aceSerializer.deserialize(entry.getValue());
+                 if (null != aceInfo) {
+                     allAceInfoMap.put(sid, aceInfo);
+                 }
              }
          }
          return allAceInfoMap;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --cc 
server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 5ac595f,673b11b..51ec902
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@@ -849,9 -805,9 +849,10 @@@ public class QueryService extends Basic
              List<List<String>> results, List<SelectedColumnMeta> columnMetas) 
throws SQLException {
  
          CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(true);
 +
+         PreparedStatement preparedStatement = null;
          try {
-             conn.prepareStatement(correctedSql);
+             preparedStatement = conn.prepareStatement(correctedSql);
              throw new IllegalStateException("Should have thrown 
OnlyPrepareEarlyAbortException");
          } catch (Exception e) {
              Throwable rootCause = ExceptionUtils.getRootCause(e);
@@@ -883,12 -839,13 +884,13 @@@
              }
          } finally {
              CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(false);
+             DBUtils.closeQuietly(preparedStatement);
          }
  
 -        return getSqlResponse(isPushDown, results, columnMetas);
 +        return buildSqlResponse(isPushDown, results, columnMetas);
      }
  
 -    private SQLResponse getSqlResponse(Boolean isPushDown, List<List<String>> 
results,
 +    private SQLResponse buildSqlResponse(Boolean isPushDown, 
List<List<String>> results,
              List<SelectedColumnMeta> columnMetas) {
  
          boolean isPartialResult = false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
----------------------------------------------------------------------
diff --cc server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index b47f05f,a509f88..44a30f1
--- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@@ -19,12 -19,20 +19,22 @@@
  package org.apache.kylin.rest.service;
  
  import java.io.IOException;
 +import java.sql.SQLException;
+ import java.util.Collections;
+ import java.util.List;
  
+ import org.apache.kylin.engine.mr.CubingJob;
  import org.apache.kylin.job.constant.JobTimeFilterEnum;
+ import org.apache.kylin.job.exception.ExecuteException;
  import org.apache.kylin.job.exception.JobException;
+ import org.apache.kylin.job.execution.AbstractExecutable;
+ import org.apache.kylin.job.execution.ExecutableContext;
+ import org.apache.kylin.job.execution.ExecutableManager;
+ import org.apache.kylin.job.execution.ExecutableState;
+ import org.apache.kylin.job.execution.ExecuteResult;
+ import org.apache.kylin.job.execution.Output;
  import org.apache.kylin.metadata.project.ProjectInstance;
 +import org.apache.kylin.query.QueryConnection;
  import org.junit.Assert;
  import org.junit.Test;
  import org.springframework.beans.factory.annotation.Autowired;
@@@ -52,4 -60,32 +62,27 @@@ public class JobServiceTest extends Ser
          Assert.assertNull(jobService.getJobInstance("job_not_exist"));
          Assert.assertNotNull(jobService.searchJobs(null, null, null, 0, 0, 
JobTimeFilterEnum.ALL));
      }
+ 
+     @Test
+     public void testExceptionOnLostJobOutput() {
+         ExecutableManager manager = 
ExecutableManager.getInstance(jobService.getConfig());
+         AbstractExecutable executable = new TestJob();
+         manager.addJob(executable);
 -        List<CubingJob> jobs = jobService.innerSearchCubingJobs("cube",
 -                "jobName",
 -                Collections.<ExecutableState>emptySet(),
 -                0,
 -                Long.MAX_VALUE,
 -                Collections.<String, Output>emptyMap(),
 -                true,
 -                "project");
++        List<CubingJob> jobs = jobService.innerSearchCubingJobs("cube", 
"jobName",
++                Collections.<ExecutableState> emptySet(), 0, Long.MAX_VALUE, 
Collections.<String, Output> emptyMap(),
++                true, "project");
+         Assert.assertEquals(0, jobs.size());
+     }
+ 
+     public static class TestJob extends CubingJob {
+ 
 -        public TestJob(){
++        public TestJob() {
+             super();
+         }
+ 
+         @Override
+         protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+             return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/source-kafka/pom.xml
----------------------------------------------------------------------

Reply via email to