This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.x-hadoop3.1
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 73591c613e9e4b537b24a85d59f8cc6dd15d8fd1
Author: Cheng Wang <cheng.w...@kyligence.io>
AuthorDate: Tue Apr 25 18:45:57 2017 +0800

    KYLIN-2565, upgrade to Hadoop3.0
---
 .../mr/common/DefaultSslProtocolSocketFactory.java | 150 ------
 .../kylin/engine/mr/common/HadoopStatusGetter.java | 280 ++++++++++
 .../apache/kylin/engine/spark/SparkCountDemo.java  |  80 +++
 .../org/apache/kylin/engine/spark/SparkCubing.java | 591 +++++++++++++++++++++
 pom.xml                                            |  19 +-
 server-base/pom.xml                                |   5 +
 .../org/apache/kylin/rest/security/MockHTable.java | 112 ++--
 .../kylin/storage/hbase/HBaseConnection.java       |   5 +
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java    |  15 +-
 .../v2/coprocessor/endpoint/CubeVisitService.java  |   4 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |  12 +
 .../storage/hbase/steps/HBaseCuboidWriter.java     | 133 +++++
 .../kylin/storage/hbase/util/CubeMigrationCLI.java |   2 +-
 .../storage/hbase/util/DeployCoprocessorCLI.java   |   3 +-
 .../storage/hbase/util/ExtendCubeToHybridCLI.java  |   2 +-
 .../hbase/util/GridTableHBaseBenchmark.java        |   2 +-
 .../kylin/storage/hbase/util/PingHBaseCLI.java     |   3 +-
 .../storage/hbase/steps/CubeHFileMapperTest.java   |  22 +-
 .../kylin/storage/hbase/steps/TestHbaseClient.java |  14 +-
 .../org/apache/kylin/tool/CubeMigrationCLI.java    |  14 +-
 .../apache/kylin/tool/CubeMigrationCheckCLI.java   |  17 +-
 .../apache/kylin/tool/ExtendCubeToHybridCLI.java   |   2 +-
 .../org/apache/kylin/tool/StorageCleanupJob.java   |   1 +
 23 files changed, 1240 insertions(+), 248 deletions(-)

diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
deleted file mode 100644
index d66e4eb..0000000
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.common;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-import org.apache.commons.httpclient.ConnectTimeoutException;
-import org.apache.commons.httpclient.HttpClientError;
-import org.apache.commons.httpclient.params.HttpConnectionParams;
-import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
-import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- * 
- */
-public class DefaultSslProtocolSocketFactory implements 
SecureProtocolSocketFactory {
-    /** Log object for this class. */
-    private static Logger logger = 
LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
-    private SSLContext sslcontext = null;
-
-    /**
-     * Constructor for DefaultSslProtocolSocketFactory.
-     */
-    public DefaultSslProtocolSocketFactory() {
-        super();
-    }
-
-    /**
-     * @see 
SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
-     */
-    public Socket createSocket(String host, int port, InetAddress clientHost, 
int clientPort) throws IOException, UnknownHostException {
-        return getSSLContext().getSocketFactory().createSocket(host, port, 
clientHost, clientPort);
-    }
-
-    /**
-     * Attempts to get a new socket connection to the given host within the
-     * given time limit.
-     * 
-     * <p>
-     * To circumvent the limitations of older JREs that do not support connect
-     * timeout a controller thread is executed. The controller thread attempts
-     * to create a new socket within the given limit of time. If socket
-     * constructor does not return until the timeout expires, the controller
-     * terminates and throws an {@link ConnectTimeoutException}
-     * </p>
-     * 
-     * @param host
-     *            the host name/IP
-     * @param port
-     *            the port on the host
-     * @param localAddress
-     *            the local host name/IP to bind the socket to
-     * @param localPort
-     *            the port on the local machine
-     * @param params
-     *            {@link HttpConnectionParams Http connection parameters}
-     * 
-     * @return Socket a new socket
-     * 
-     * @throws IOException
-     *             if an I/O error occurs while creating the socket
-     * @throws UnknownHostException
-     *             if the IP address of the host cannot be determined
-     * @throws ConnectTimeoutException
-     *             DOCUMENT ME!
-     * @throws IllegalArgumentException
-     *             DOCUMENT ME!
-     */
-    public Socket createSocket(final String host, final int port, final 
InetAddress localAddress, final int localPort, final HttpConnectionParams 
params) throws IOException, UnknownHostException, ConnectTimeoutException {
-        if (params == null) {
-            throw new IllegalArgumentException("Parameters may not be null");
-        }
-
-        int timeout = params.getConnectionTimeout();
-
-        if (timeout == 0) {
-            return createSocket(host, port, localAddress, localPort);
-        } else {
-            // To be eventually deprecated when migrated to Java 1.4 or above
-            return ControllerThreadSocketFactory.createSocket(this, host, 
port, localAddress, localPort, timeout);
-        }
-    }
-
-    /**
-     * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
-     */
-    public Socket createSocket(String host, int port) throws IOException, 
UnknownHostException {
-        return getSSLContext().getSocketFactory().createSocket(host, port);
-    }
-
-    /**
-     * @see 
SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
-     */
-    public Socket createSocket(Socket socket, String host, int port, boolean 
autoClose) throws IOException, UnknownHostException {
-        return getSSLContext().getSocketFactory().createSocket(socket, host, 
port, autoClose);
-    }
-
-    public boolean equals(Object obj) {
-        return ((obj != null) && 
obj.getClass().equals(DefaultX509TrustManager.class));
-    }
-
-    public int hashCode() {
-        return DefaultX509TrustManager.class.hashCode();
-    }
-
-    private static SSLContext createEasySSLContext() {
-        try {
-            SSLContext context = SSLContext.getInstance("TLS");
-            context.init(null, new TrustManager[] { new 
DefaultX509TrustManager(null) }, null);
-
-            return context;
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new HttpClientError(e.toString());
-        }
-    }
-
-    private SSLContext getSSLContext() {
-        if (this.sslcontext == null) {
-            this.sslcontext = createEasySSLContext();
-        }
-
-        return this.sslcontext;
-    }
-}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
new file mode 100644
index 0000000..0245c1c
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.nio.charset.Charset;
+import java.security.KeyManagementException;
+import java.security.Principal;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthSchemeRegistry;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.params.AuthPolicy;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.scheme.SchemeRegistry;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ */
+public class HadoopStatusGetter {
+
+    private final String mrJobId;
+    private final String yarnUrl;
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(HadoopStatusGetter.class);
+
+    public HadoopStatusGetter(String yarnUrl, String mrJobId) {
+        this.yarnUrl = yarnUrl;
+        this.mrJobId = mrJobId;
+    }
+
+    public Pair<RMAppState, FinalApplicationStatus> get(boolean 
useKerberosAuth) throws IOException {
+        String applicationId = mrJobId.replace("job", "application");
+        String url = yarnUrl.replace("${job_id}", applicationId);
+        String response = useKerberosAuth ? 
getHttpResponseWithKerberosAuth(url) : getHttpResponse(url);
+        logger.debug("Hadoop job " + mrJobId + " status : " + response);
+        JsonNode root = new ObjectMapper().readTree(response);
+        RMAppState state = 
RMAppState.valueOf(root.findValue("state").textValue());
+        FinalApplicationStatus finalStatus = 
FinalApplicationStatus.valueOf(root.findValue("finalStatus").textValue());
+        return Pair.of(state, finalStatus);
+    }
+
+    private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf";
+
+    private String getHttpResponseWithKerberosAuth(String url) throws 
IOException {
+        String krb5ConfigPath = System.getProperty("java.security.krb5.conf");
+        if (krb5ConfigPath == null) {
+            krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION;
+        }
+        boolean skipPortAtKerberosDatabaseLookup = true;
+        System.setProperty("java.security.krb5.conf", krb5ConfigPath);
+        System.setProperty("sun.security.krb5.debug", "true");
+        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+
+        DefaultHttpClient client = new DefaultHttpClient();
+        AuthSchemeRegistry authSchemeRegistry = new AuthSchemeRegistry();
+        authSchemeRegistry.register(AuthPolicy.SPNEGO, new 
SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup));
+        client.setAuthSchemes(authSchemeRegistry);
+
+        BasicCredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+        Credentials useJaasCreds = new Credentials() {
+            public String getPassword() {
+                return null;
+            }
+
+            public Principal getUserPrincipal() {
+                return null;
+            }
+        };
+        credentialsProvider.setCredentials(new AuthScope(null, -1, null), 
useJaasCreds);
+        client.setCredentialsProvider(credentialsProvider);
+
+        String response = null;
+        while (response == null) {
+            if (url.startsWith("https://";)) {
+                registerEasyHttps(client);
+            }
+            if (url.contains("anonymous=true") == false) {
+                url += url.contains("?") ? "&" : "?";
+                url += "anonymous=true";
+            }
+            HttpGet httpget = new HttpGet(url);
+            httpget.addHeader("accept", "application/json");
+            try {
+                HttpResponse httpResponse = client.execute(httpget);
+                String redirect = null;
+                org.apache.http.Header h = 
httpResponse.getFirstHeader("Location");
+                if (h != null) {
+                    redirect = h.getValue();
+                    if (isValidURL(redirect) == false) {
+                        logger.info("Get invalid redirect url, skip it: " + 
redirect);
+                        Thread.sleep(1000L);
+                        continue;
+                    }
+                } else {
+                    h = httpResponse.getFirstHeader("Refresh");
+                    if (h != null) {
+                        String s = h.getValue();
+                        int cut = s.indexOf("url=");
+                        if (cut >= 0) {
+                            redirect = s.substring(cut + 4);
+
+                            if (isValidURL(redirect) == false) {
+                                logger.info("Get invalid redirect url, skip 
it: " + redirect);
+                                Thread.sleep(1000L);
+                                continue;
+                            }
+                        }
+                    }
+                }
+
+                if (redirect == null) {
+                    response = 
IOUtils.toString(httpResponse.getEntity().getContent(), 
Charset.defaultCharset());
+                    logger.debug("Job " + mrJobId + " get status check 
result.\n");
+                } else {
+                    url = redirect;
+                    logger.debug("Job " + mrJobId + " check redirect url " + 
url + ".\n");
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                logger.error(e.getMessage());
+            } finally {
+                httpget.releaseConnection();
+            }
+        }
+
+        return response;
+    }
+
+    private String getHttpResponse(String url) throws IOException {
+        HttpClient client = new DefaultHttpClient();
+
+        String response = null;
+        while (response == null) { // follow redirects via 'refresh'
+            if (url.startsWith("https://";)) {
+                registerEasyHttps(client);
+            }
+            if (url.contains("anonymous=true") == false) {
+                url += url.contains("?") ? "&" : "?";
+                url += "anonymous=true";
+            }
+
+            HttpGet get = new HttpGet(url);
+            get.addHeader("accept", "application/json");
+
+            try {
+                HttpResponse res = client.execute(get);
+
+                String redirect = null;
+                Header h = res.getFirstHeader("Location");
+                if (h != null) {
+                    redirect = h.getValue();
+                    if (isValidURL(redirect) == false) {
+                        logger.info("Get invalid redirect url, skip it: " + 
redirect);
+                        Thread.sleep(1000L);
+                        continue;
+                    }
+                } else {
+                    h = res.getFirstHeader("Refresh");
+                    if (h != null) {
+                        String s = h.getValue();
+                        int cut = s.indexOf("url=");
+                        if (cut >= 0) {
+                            redirect = s.substring(cut + 4);
+
+                            if (isValidURL(redirect) == false) {
+                                logger.info("Get invalid redirect url, skip 
it: " + redirect);
+                                Thread.sleep(1000L);
+                                continue;
+                            }
+                        }
+                    }
+                }
+
+                if (redirect == null) {
+                    response = res.getStatusLine().toString();
+                    logger.debug("Job " + mrJobId + " get status check 
result.\n");
+                } else {
+                    url = redirect;
+                    logger.debug("Job " + mrJobId + " check redirect url " + 
url + ".\n");
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                logger.error(e.getMessage());
+            } finally {
+                get.releaseConnection();
+            }
+        }
+
+        return response;
+    }
+
+    private static void registerEasyHttps(HttpClient client) {
+        SSLContext sslContext;
+        try {
+            sslContext = SSLContext.getInstance("SSL");
+
+            // set up a TrustManager that trusts everything
+            try {
+                sslContext.init(null, new TrustManager[] { new 
DefaultX509TrustManager(null) {
+                    public X509Certificate[] getAcceptedIssuers() {
+                        logger.debug("getAcceptedIssuers");
+                        return null;
+                    }
+
+                    public void checkClientTrusted(X509Certificate[] certs, 
String authType) {
+                        logger.debug("checkClientTrusted");
+                    }
+
+                    public void checkServerTrusted(X509Certificate[] certs, 
String authType) {
+                        logger.debug("checkServerTrusted");
+                    }
+                } }, new SecureRandom());
+            } catch (KeyManagementException e) {
+            }
+            SSLSocketFactory ssf = new SSLSocketFactory(sslContext, 
SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+            ClientConnectionManager ccm = client.getConnectionManager();
+            SchemeRegistry sr = ccm.getSchemeRegistry();
+            sr.register(new Scheme("https", 443, ssf));
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private static boolean isValidURL(String value) {
+        if (StringUtils.isNotEmpty(value)) {
+            java.net.URL url;
+            try {
+                url = new java.net.URL(value);
+            } catch (MalformedURLException var5) {
+                return false;
+            }
+
+            return StringUtils.isNotEmpty(url.getProtocol()) && 
StringUtils.isNotEmpty(url.getHost());
+        }
+
+        return false;
+    }
+
+}
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
new file mode 100644
index 0000000..a079a57
--- /dev/null
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCountDemo extends AbstractApplication {
+
+    private static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input
 path").create("input");
+
+    private Options options;
+
+    public SparkCountDemo() {
+        options = new Options();
+        //        options.addOption(OPTION_INPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // 
Should be some file on your system
+        SparkConf conf = new SparkConf().setAppName("Simple Application");
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        final JavaPairRDD<String, Integer> logData = 
sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
+
+            @Override
+            public Tuple2<String, Integer> call(String s) throws Exception {
+                return new Tuple2<String, Integer>(s, s.length());
+            }
+        }).sortByKey();
+        logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
+
+        System.out.println("line number:" + logData.count());
+
+        logData.mapToPair(new PairFunction<Tuple2<String, Integer>, 
ImmutableBytesWritable, KeyValue>() {
+            @Override
+            public Tuple2<ImmutableBytesWritable, KeyValue> 
call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
+                ImmutableBytesWritable key = new 
ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
+                KeyValue value = new 
KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), 
String.valueOf(stringIntegerTuple2._2()).getBytes());
+                return new Tuple2(key, value);
+            }
+        }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", 
ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class);
+
+    }
+}
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
new file mode 100644
index 0000000..a87d66b
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
+import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
+import org.apache.kylin.engine.spark.util.IteratorUtils;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.hive.HiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.primitives.UnsignedBytes;
+
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCubing extends AbstractApplication {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(SparkCubing.class);
+
+    private static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive
 Intermediate Table").create("hiveTable");
+    private static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube
 Name").create(BatchConstants.ARG_CUBE_NAME);
+    private static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube
 Segment Id").create("segmentId");
+    private static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration
 Path").create("confPath");
+    private static final Option OPTION_COPROCESSOR = 
OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor
 Jar Path").create("coprocessor");
+
+    private Options options;
+
+    public SparkCubing() {
+        options = new Options();
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_CONF_PATH);
+        options.addOption(OPTION_COPROCESSOR);
+
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    public static KylinConfig loadKylinPropsAndMetadata(String folder) throws 
IOException {
+        File metaDir = new File(folder);
+        if 
(!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) 
{
+            System.setProperty(KylinConfig.KYLIN_CONF, 
metaDir.getAbsolutePath());
+            logger.info("The absolute path for meta dir is " + 
metaDir.getAbsolutePath());
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            System.out.println("setting metadataUrl to " + 
metaDir.getAbsolutePath());
+            kylinConfig.setMetadataUrl(metaDir.getAbsolutePath());
+            return kylinConfig;
+        } else {
+            return KylinConfig.getInstanceFromEnv();
+        }
+    }
+
+    private void setupClasspath(JavaSparkContext sc, String confPath) throws 
Exception {
+        ClassUtil.addClasspath(confPath);
+        final File[] files = new File(confPath).listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                if (pathname.getAbsolutePath().endsWith(".xml")) {
+                    return true;
+                }
+                if (pathname.getAbsolutePath().endsWith(".properties")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        if (files == null) {
+            return;
+        }
+        for (File file : files) {
+            sc.addFile(file.getAbsolutePath());
+        }
+    }
+
+    private void writeDictionary(DataFrame intermediateTable, String cubeName, 
String segmentId) throws Exception {
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        final CubeInstance cubeInstance = 
cubeManager.reloadCubeLocal(cubeName);
+        final String[] columns = intermediateTable.columns();
+        final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+        final CubeJoinedFlatTableEnrich flatDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
+        final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, 
Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
+        final long start = System.currentTimeMillis();
+        final RowKeyDesc rowKey = cubeDesc.getRowkey();
+        for (int i = 0; i < baseCuboidColumn.size(); i++) {
+            TblColRef col = baseCuboidColumn.get(i);
+            if (!rowKey.isUseDictionary(col)) {
+                continue;
+            }
+            final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i];
+            tblColRefMap.put(rowKeyColumnIndex, col);
+        }
+
+        Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
+        for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) {
+            final String column = columns[entry.getKey()];
+            final TblColRef tblColRef = entry.getValue();
+            final DataFrame frame = 
intermediateTable.select(column).distinct();
+
+            final Row[] rows = frame.collect();
+            dictionaryMap.put(tblColRef, 
DictionaryGenerator.buildDictionary(tblColRef.getType(), new 
IterableDictionaryValueEnumerator(new Iterable<String>() {
+                @Override
+                public Iterator<String> iterator() {
+                    return new Iterator<String>() {
+                        int i = 0;
+
+                        @Override
+                        public boolean hasNext() {
+                            return i < rows.length;
+                        }
+
+                        @Override
+                        public String next() {
+                            if (hasNext()) {
+                                final Row row = rows[i++];
+                                final Object o = row.get(0);
+                                return o != null ? o.toString() : null;
+                            } else {
+                                throw new NoSuchElementException();
+                            }
+                        }
+
+                        @Override
+                        public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            })));
+        }
+        final long end = System.currentTimeMillis();
+        CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
+        try {
+            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+            cubeBuilder.setToUpdateSegs(seg);
+            cubeManager.updateCube(cubeBuilder);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to deal with the request: " + 
e.getLocalizedMessage());
+        }
+    }
+
+    private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> 
rowJavaRDD, final String cubeName, String segmentId) throws Exception {
+        CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+        List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
+        final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap();
+        for (Long id : allCuboidIds) {
+            zeroValue.put(id, new 
HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+        }
+
+        CubeJoinedFlatTableEnrich flatDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
+
+        final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes();
+        final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Map<Long, Integer[]> allCuboidsBitSet = 
Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
+
+        for (Long cuboidId : allCuboidIds) {
+            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
+
+            long mask = Long.highestOneBit(baseCuboidId);
+            int position = 0;
+            for (int i = 0; i < nRowKey; i++) {
+                if ((mask & cuboidId) > 0) {
+                    cuboidBitSet[position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+        }
+        for (int i = 0; i < nRowKey; ++i) {
+            row_hashcodes[i] = new ByteArray();
+        }
+
+        final HashMap<Long, HLLCounter> samplingResult = 
rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, 
List<String>, HashMap<Long, HLLCounter>>() {
+
+            final HashFunction hashFunction = Hashing.murmur3_128();
+
+            @Override
+            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> 
v1, List<String> v2) throws Exception {
+                for (int i = 0; i < nRowKey; i++) {
+                    Hasher hc = hashFunction.newHasher();
+                    String colValue = v2.get(rowKeyColumnIndexes[i]);
+                    if (colValue != null) {
+                        
row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+                    } else {
+                        row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+                    }
+                }
+
+                for (Map.Entry<Long, Integer[]> entry : 
allCuboidsBitSet.entrySet()) {
+                    Hasher hc = hashFunction.newHasher();
+                    HLLCounter counter = v1.get(entry.getKey());
+                    final Integer[] cuboidBitSet = entry.getValue();
+                    for (int position = 0; position < cuboidBitSet.length; 
position++) {
+                        
hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
+                    }
+                    counter.add(hc.hash().asBytes());
+                }
+                return v1;
+            }
+        }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, 
HashMap<Long, HLLCounter>>() {
+            @Override
+            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> 
v1, HashMap<Long, HLLCounter> v2) throws Exception {
+                Preconditions.checkArgument(v1.size() == v2.size());
+                Preconditions.checkArgument(v1.size() > 0);
+                for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
+                    final HLLCounter counter1 = entry.getValue();
+                    final HLLCounter counter2 = v2.get(entry.getKey());
+                    counter1.merge(Preconditions.checkNotNull(counter2, 
"counter cannot be null"));
+                }
+                return v1;
+            }
+
+        });
+        return samplingResult;
+    }
+
+    /** return hfile location */
+    private String build(JavaRDD<List<String>> javaRDD, final String cubeName, 
final String segmentId, final byte[][] splitKeys) throws Exception {
+        CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, 
Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
+        final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap();
+        final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+        for (TblColRef tblColRef : baseCuboidColumn) {
+            columnLengthMap.put(tblColRef, 
dimEncMap.get(tblColRef).getLengthOfEncoding());
+        }
+        final Map<TblColRef, Dictionary<String>> dictionaryMap = 
Maps.newHashMap();
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (cubeDesc.getRowkey().isUseDictionary(col)) {
+                    Dictionary<String> dict = cubeSegment.getDictionary(col);
+                    if (dict == null) {
+                        System.err.println("Dictionary for " + col + " was not 
found.");
+                        continue;
+                    }
+                    dictionaryMap.put(col, dict);
+                    System.out.println("col:" + col + " dictionary size:" + 
dict.getSize());
+                }
+            }
+        }
+
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            FunctionDesc func = measureDesc.getFunction();
+            List<TblColRef> colRefs = 
func.getMeasureType().getColumnsNeedDictionary(func);
+            for (TblColRef col : colRefs) {
+                dictionaryMap.put(col, cubeSegment.getDictionary(col));
+            }
+        }
+
+        final JavaPairRDD<byte[], byte[]> javaPairRDD = 
javaRDD.glom().mapPartitionsToPair(new 
PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
+
+            @Override
+            public Iterator<Tuple2<byte[], byte[]>> 
call(Iterator<List<List<String>>> listIterator) throws Exception {
+                long t = System.currentTimeMillis();
+                prepare();
+
+                final CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+
+                LinkedBlockingQueue<List<String>> blockingQueue = new 
LinkedBlockingQueue();
+                System.out.println("load properties finished");
+                IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(cubeSegment);
+                AbstractInMemCubeBuilder inMemCubeBuilder = new 
DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
+                final SparkCuboidWriter sparkCuboidWriter = new 
BufferedCuboidWriter(new 
DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
+                
Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue,
 sparkCuboidWriter));
+                try {
+                    while (listIterator.hasNext()) {
+                        for (List<String> row : listIterator.next()) {
+                            blockingQueue.put(row);
+                        }
+                    }
+                    blockingQueue.put(Collections.<String> emptyList());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                System.out.println("build partition cost: " + 
(System.currentTimeMillis() - t) + "ms");
+                return sparkCuboidWriter.getResult().iterator();
+            }
+        });
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration conf = 
getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier());
+        Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + 
UUID.randomUUID().toString());
+        Preconditions.checkArgument(!FileSystem.get(conf).exists(path));
+        String url = conf.get("fs.defaultFS") + path.toString();
+        System.out.println("use " + url + " as hfile");
+        List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
+        final int measureSize = measuresDescs.size();
+        final String[] dataTypes = new String[measureSize];
+        for (int i = 0; i < dataTypes.length; i++) {
+            dataTypes[i] = measuresDescs.get(i).getFunction().getReturnType();
+        }
+        final MeasureAggregators aggs = new MeasureAggregators(measuresDescs);
+        writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, 
conf, url);
+        return url;
+    }
+
+    private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, 
final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, 
final byte[][] splitKeys, final Configuration conf, final String hFileLocation) 
{
+        javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
+            @Override
+            public int numPartitions() {
+                return splitKeys.length + 1;
+            }
+
+            @Override
+            public int getPartition(Object key) {
+                Preconditions.checkArgument(key instanceof byte[]);
+                for (int i = 0, n = splitKeys.length; i < n; ++i) {
+                    if 
(UnsignedBytes.lexicographicalComparator().compare((byte[]) key, splitKeys[i]) 
< 0) {
+                        return i;
+                    }
+                }
+                return splitKeys.length;
+            }
+        }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new 
FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
+            @Override
+            public Iterator<Tuple2<byte[], byte[]>> call(final 
Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
+                Iterable<Tuple2<byte[], byte[]>> iterable = new 
Iterable<Tuple2<byte[], byte[]>>() {
+                    final BufferedMeasureCodec codec = new 
BufferedMeasureCodec(dataTypes);
+                    final Object[] input = new Object[measureSize];
+                    final Object[] result = new Object[measureSize];
+
+                    @Override
+                    public Iterator<Tuple2<byte[], byte[]>> iterator() {
+                        return IteratorUtils.merge(tuple2Iterator, 
UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, 
byte[]>() {
+                            @Override
+                            public byte[] call(Iterable<byte[]> v1) throws 
Exception {
+                                final LinkedList<byte[]> list = 
Lists.newLinkedList(v1);
+                                if (list.size() == 1) {
+                                    return list.get(0);
+                                }
+                                aggs.reset();
+                                for (byte[] v : list) {
+                                    codec.decode(ByteBuffer.wrap(v), input);
+                                    aggs.aggregate(input);
+                                }
+                                aggs.collectStates(result);
+                                ByteBuffer buffer = codec.encode(result);
+                                byte[] bytes = new byte[buffer.position()];
+                                System.arraycopy(buffer.array(), 
buffer.arrayOffset(), bytes, 0, buffer.position());
+                                return bytes;
+                            }
+                        });
+                    }
+                };
+                return iterable.iterator();
+            }
+        }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, 
ImmutableBytesWritable, KeyValue>() {
+            @Override
+            public Tuple2<ImmutableBytesWritable, KeyValue> 
call(Tuple2<byte[], byte[]> tuple2) throws Exception {
+                ImmutableBytesWritable key = new 
ImmutableBytesWritable(tuple2._1());
+                KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), 
"M".getBytes(), tuple2._2());
+                return new Tuple2(key, value);
+            }
+        }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, 
KeyValue.class, HFileOutputFormat2.class, conf);
+    }
+
+    public static void prepare() throws Exception {
+        final File file = new File(SparkFiles.get("kylin.properties"));
+        final String confPath = file.getParentFile().getAbsolutePath();
+        System.out.println("conf directory:" + confPath);
+        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+        ClassUtil.addClasspath(confPath);
+    }
+
+    private byte[][] createHTable(String cubeName, String segmentId, Map<Long, 
HLLCounter> samplingResult) throws Exception {
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final Map<Long, Long> rowCountMap = 
CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100);
+        final Map<Long, Double> cubeSizeMap = 
CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap);
+        System.out.println("cube size estimation:" + cubeSizeMap);
+        final byte[][] splitKeys = 
CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, 
cubeSegment, null); //FIXME: passing non-null value for 
'hfileSplitsOutputFolder'
+        CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+        System.out.println(cubeSegment.getStorageLocationIdentifier() + " 
table created");
+        return splitKeys;
+    }
+
+    private Configuration getConfigurationForHFile(String hTableName) throws 
IOException {
+        final Configuration conf = 
HBaseConnection.getCurrentHBaseConfiguration();
+        Job job = Job.getInstance(conf);
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(KeyValue.class);
+        Connection connection = HBaseConnection.get();
+        Table table = connection.getTable(TableName.valueOf(hTableName));
+        HFileOutputFormat2.configureIncrementalLoad(job, table, 
connection.getRegionLocator(TableName.valueOf(hTableName)));
+        return conf;
+    }
+
+    private void bulkLoadHFile(String cubeName, String segmentId, String 
hfileLocation) throws Exception {
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final Configuration hbaseConf = 
HBaseConnection.getCurrentHBaseConfiguration();
+
+        FsShell shell = new FsShell(hbaseConf);
+        try {
+            shell.run(new String[] { "-chmod", "-R", "777", hfileLocation });
+        } catch (Exception e) {
+            logger.error("Couldnt change the file permissions ", e);
+            throw new IOException(e);
+        }
+
+        String[] newArgs = new String[2];
+        newArgs[0] = hfileLocation;
+        newArgs[1] = cubeSegment.getStorageLocationIdentifier();
+
+        int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), 
newArgs);
+        System.out.println("incremental load result:" + ret);
+
+        cubeSegment.setStatus(SegmentStatusEnum.READY);
+        try {
+            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+            cubeInstance.setStatus(RealizationStatusEnum.READY);
+            cubeSegment.setStatus(SegmentStatusEnum.READY);
+            cubeBuilder.setToUpdateSegs(cubeSegment);
+            CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to deal with the request: " + 
e.getLocalizedMessage());
+        }
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String hiveTable = 
optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        SparkConf conf = new SparkConf().setAppName("Simple Application");
+        //memory conf
+        conf.set("spark.executor.memory", "6g");
+        conf.set("spark.storage.memoryFraction", "0.3");
+
+        //serialization conf
+        conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true");
+
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        HiveContext sqlContext = new HiveContext(sc.sc());
+        final DataFrame intermediateTable = sqlContext.sql("select * from " + 
hiveTable);
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = 
optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+        final String coprocessor = 
optionsHelper.getOptionValue(OPTION_COPROCESSOR);
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        kylinConfig.overrideCoprocessorLocalJar(coprocessor);
+
+        setupClasspath(sc, confPath);
+        intermediateTable.cache();
+        writeDictionary(intermediateTable, cubeName, segmentId);
+        final JavaRDD<List<String>> rowJavaRDD = 
intermediateTable.javaRDD().map(new 
org.apache.spark.api.java.function.Function<Row, List<String>>() {
+            @Override
+            public List<String> call(Row v1) throws Exception {
+                ArrayList<String> result = 
Lists.newArrayListWithExpectedSize(v1.size());
+                for (int i = 0; i < v1.size(); i++) {
+                    final Object o = v1.get(i);
+                    if (o != null) {
+                        result.add(o.toString());
+                    } else {
+                        result.add(null);
+                    }
+                }
+                return result;
+
+            }
+        });
+
+        final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, 
cubeName, segmentId);
+        final byte[][] splitKeys = createHTable(cubeName, segmentId, 
samplingResult);
+
+        final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);
+        bulkLoadHFile(cubeName, segmentId, hfile);
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index 76c2220..80ef543 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,21 +39,21 @@
 
     <properties>
         <!-- General Properties -->
-        <javaVersion>1.7</javaVersion>
+        <javaVersion>1.8</javaVersion>
         <maven-model.version>3.3.9</maven-model.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
         <!-- Hadoop versions -->
-        <hadoop2.version>2.7.1</hadoop2.version>
-        <yarn.version>2.7.1</yarn.version>
+        <hadoop2.version>3.0.0-alpha2</hadoop2.version>
+        <yarn.version>3.0.0-alpha2</yarn.version>
 
         <!-- Hive versions -->
-        <hive.version>1.2.1</hive.version>
-        <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
+        <hive.version>2.1.0</hive.version>
+        <hive-hcatalog.version>2.1.0</hive-hcatalog.version>
 
         <!-- HBase versions -->
-        <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
+        <hbase-hadoop2.version>2.0.0-SNAPSHOT</hbase-hadoop2.version>
 
         <!-- Kafka versions -->
         <kafka.version>1.0.0</kafka.version>
@@ -68,6 +68,7 @@
         <!-- Scala versions -->
         <scala.version>2.11.0</scala.version>
 
+        <commons-configuration.version>1.6</commons-configuration.version>
         <!-- <reflections.version>0.9.10</reflections.version> -->
 
         <!-- Calcite Version -->
@@ -578,6 +579,12 @@
                 <version>${yarn.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>commons-configuration</groupId>
+                <artifactId>commons-configuration</artifactId>
+                <version>${commons-configuration.version}</version>
+            </dependency>
+
             <!-- Calcite dependencies -->
             <dependency>
                 <groupId>org.apache.calcite</groupId>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index efb348e..fb57c6d 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -194,6 +194,11 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <repositories>
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java 
b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
index 9eb9bb7..fd53b5b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
@@ -43,6 +43,8 @@ import java.util.TreeMap;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -51,7 +53,6 @@ import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -98,7 +100,7 @@ public class MockHTable implements Table {
 
     private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], 
NavigableMap<Long, byte[]>>>> data = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], 
NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+    private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], 
NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
         return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
     }
 
@@ -163,8 +165,8 @@ public class MockHTable implements Table {
         throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
     }
 
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], 
NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, 
long timestampEnd, int maxVersions) {
-        List<KeyValue> ret = new ArrayList<KeyValue>();
+    private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], 
NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, 
long timestampEnd, int maxVersions) {
+        List<Cell> ret = new ArrayList<>();
         for (byte[] family : rowdata.keySet())
             for (byte[] qualifier : rowdata.get(family).keySet()) {
                 int versionsAdded = 0;
@@ -208,7 +210,6 @@ public class MockHTable implements Table {
     /**
      * {@inheritDoc}
      */
-    @Override
     public Object[] batch(List<? extends Row> actions) throws IOException, 
InterruptedException {
         Object[] results = new Object[actions.size()]; // same size.
         for (int i = 0; i < actions.size(); i++) {
@@ -242,11 +243,6 @@ public class MockHTable implements Table {
 
     }
 
-    @Override
-    public <R> Object[] batchCallback(List<? extends Row> actions, 
Batch.Callback<R> callback) throws IOException, InterruptedException {
-        return new Object[0];
-    }
-
     /**
      * {@inheritDoc}
      */
@@ -255,7 +251,7 @@ public class MockHTable implements Table {
         if (!data.containsKey(get.getRow()))
             return new Result();
         byte[] row = get.getRow();
-        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        List<Cell> kvs = new ArrayList<>();
         if (!get.hasFamilies()) {
             kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
         } else {
@@ -280,7 +276,7 @@ public class MockHTable implements Table {
             kvs = filter(filter, kvs);
         }
 
-        return new Result(kvs);
+        return Result.create(kvs);
     }
 
     /**
@@ -318,11 +314,11 @@ public class MockHTable implements Table {
                     break;
             }
 
-            List<KeyValue> kvs = null;
+            List<Cell> kvs = null;
             if (!scan.hasFamilies()) {
                 kvs = toKeyValue(row, data.get(row), 
scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), 
scan.getMaxVersions());
             } else {
-                kvs = new ArrayList<KeyValue>();
+                kvs = new ArrayList<>();
                 for (byte[] family : scan.getFamilyMap().keySet()) {
                     if (data.get(row).get(family) == null)
                         continue;
@@ -354,7 +350,7 @@ public class MockHTable implements Table {
                 }
             }
             if (!kvs.isEmpty()) {
-                ret.add(new Result(kvs));
+                ret.add(Result.create(kvs));
             }
         }
 
@@ -389,12 +385,14 @@ public class MockHTable implements Table {
             public void close() {
             }
 
+            @Override
             public boolean renewLease() {
-                throw new RuntimeException(this.getClass() + " does NOT 
implement this method.");
+                return false;
             }
 
+            @Override
             public ScanMetrics getScanMetrics() {
-                throw new RuntimeException(this.getClass() + " does NOT 
implement this method.");
+                return null;
             }
         };
     }
@@ -406,10 +404,10 @@ public class MockHTable implements Table {
      * @param kvs    List of a row's KeyValues
      * @return List of KeyValues that were not filtered.
      */
-    private List<KeyValue> filter(Filter filter, List<KeyValue> kvs) throws 
IOException {
+    private List<Cell> filter(Filter filter, List<Cell> kvs) throws 
IOException {
         filter.reset();
 
-        List<KeyValue> tmp = new ArrayList<KeyValue>(kvs.size());
+        List<Cell> tmp = new ArrayList<>(kvs.size());
         tmp.addAll(kvs);
 
         /*
@@ -418,9 +416,9 @@ public class MockHTable implements Table {
          * See Figure 4-2 on p. 163.
          */
         boolean filteredOnRowKey = false;
-        List<KeyValue> nkvs = new ArrayList<KeyValue>(tmp.size());
-        for (KeyValue kv : tmp) {
-            if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), 
kv.getRowLength())) {
+        List<Cell> nkvs = new ArrayList<>(tmp.size());
+        for (Cell kv : tmp) {
+            if (filter.filterRowKey(kv)) {
                 filteredOnRowKey = true;
                 break;
             }
@@ -483,16 +481,16 @@ public class MockHTable implements Table {
     public void put(Put put) throws IOException {
         byte[] row = put.getRow();
         NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 
rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], 
NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
-        for (byte[] family : put.getFamilyMap().keySet()) {
+        for (byte[] family : put.getFamilyCellMap().keySet()) {
             if (columnFamilies.contains(new String(family)) == false) {
                 throw new RuntimeException("Not Exists columnFamily : " + new 
String(family));
             }
             NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = 
forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, 
byte[]>>(Bytes.BYTES_COMPARATOR));
-            for (KeyValue kv : put.getFamilyMap().get(family)) {
-                
kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
-                byte[] qualifier = kv.getQualifier();
+            for (Cell kv : put.getFamilyCellMap().get(family)) {
+                CellUtil.updateLatestStamp(kv, System.currentTimeMillis());
+                byte[] qualifier = kv.getQualifierArray();
                 NavigableMap<Long, byte[]> qualifierData = 
forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
-                qualifierData.put(kv.getTimestamp(), kv.getValue());
+                qualifierData.put(kv.getTimestamp(), kv.getValueArray());
             }
         }
     }
@@ -540,22 +538,22 @@ public class MockHTable implements Table {
         byte[] row = delete.getRow();
         if (data.get(row) == null)
             return;
-        if (delete.getFamilyMap().size() == 0) {
+        if (delete.getFamilyCellMap().size() == 0) {
             data.remove(row);
             return;
         }
-        for (byte[] family : delete.getFamilyMap().keySet()) {
+        for (byte[] family : delete.getFamilyCellMap().keySet()) {
             if (data.get(row).get(family) == null)
                 continue;
-            if (delete.getFamilyMap().get(family).isEmpty()) {
+            if (delete.getFamilyCellMap().get(family).isEmpty()) {
                 data.get(row).remove(family);
                 continue;
             }
-            for (KeyValue kv : delete.getFamilyMap().get(family)) {
-                if (kv.isDelete()) {
-                    data.get(row).get(kv.getFamily()).clear();
+            for (Cell kv : delete.getFamilyCellMap().get(family)) {
+                if (CellUtil.isDelete(kv)) {
+                    data.get(row).get(kv.getFamilyArray()).clear();
                 } else {
-                    
data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
+                    
data.get(row).get(kv.getFamilyArray()).remove(kv.getQualifierArray());
                 }
             }
             if (data.get(row).get(family).isEmpty()) {
@@ -675,40 +673,48 @@ public class MockHTable implements Table {
 
     }
 
-    public void setOperationTimeout(int operationTimeout) {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
+    /***
+     *
+     * All values are default
+     *
+     * **/
+    @Override
+    public void setOperationTimeout(int i) {
+
     }
 
+    @Override
     public int getOperationTimeout() {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
+        return 0;
     }
 
-    /** @deprecated */
-    @Deprecated
+    @Override
     public int getRpcTimeout() {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
+        return 0;
     }
 
-    /** @deprecated */
-    @Deprecated
-    public void setRpcTimeout(int rpcTimeout) {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
-    }
+    @Override
+    public void setRpcTimeout(int i) {
 
-    public int getWriteRpcTimeout() {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
     }
 
-    public void setWriteRpcTimeout(int writeRpcTimeout) {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
+    @Override
+    public int getReadRpcTimeout() {
+        return 0;
     }
 
-    public int getReadRpcTimeout() {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
+    @Override
+    public void setReadRpcTimeout(int i) {
+
     }
 
-    public void setReadRpcTimeout(int readRpcTimeout) {
-        throw new RuntimeException(this.getClass() + " does NOT implement this 
method.");
+    @Override
+    public int getWriteRpcTimeout() {
+        return 0;
     }
 
+    @Override
+    public void setWriteRpcTimeout(int i) {
+
+    }
 }
\ No newline at end of file
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 53e8a68..0f71797 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -239,6 +239,11 @@ public class HBaseConnection {
 
     // 
============================================================================
 
+    public static Connection get() {
+        String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
+        return get(url);
+    }
+
     // returned Connection can be shared by multiple threads and does not 
require close()
     @SuppressWarnings("resource")
     public static Connection get(StorageURL url) {
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index a8f4fd8..48dce1f 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -18,11 +18,8 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -47,8 +44,10 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * for test use only
@@ -181,7 +180,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
             public List<Cell> next() {
                 List<Cell> result = allResultsIterator.next().listCells();
                 for (Cell cell : result) {
-                    scannedBytes += CellUtil.estimatedSizeOf(cell);
+                    scannedBytes += CellUtil.estimatedSerializedSizeOf(cell);
                 }
                 scannedRows++;
                 return result;
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index fd54e2b..89fe56d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -38,9 +38,9 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
@@ -178,7 +178,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             List<Cell> result = delegate.next();
             rowCount++;
             for (Cell cell : result) {
-                rowBytes += CellUtil.estimatedSizeOf(cell);
+                rowBytes += CellUtil.estimatedSerializedSizeOf(cell);
             }
             return result;
         }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index ec2998b..8c3038f 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -58,6 +63,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
     public int run(String[] args) throws Exception {
         Options options = new Options();
 
+        Connection connection = null;
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
@@ -93,6 +99,10 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
             HTable htable = new HTable(configuration, 
getOptionValue(OPTION_HTABLE_NAME));
 
+            String hTableName = 
getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
+            connection = ConnectionFactory.createConnection(hbaseConf);
+            Table table = connection.getTable(TableName.valueOf(hTableName));
+            RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(hTableName));
             // Automatic config !
             HFileOutputFormat3.configureIncrementalLoad(job, htable);
             reconfigurePartitions(configuration, partitionFilePath);
@@ -113,6 +123,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
         } finally {
             if (job != null)
                 cleanupTempConfFile(job.getConfiguration());
+            if (null != connection)
+                connection.close();
         }
     }
 
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
new file mode 100644
index 0000000..afc2b4c
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.gridtable.GTRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class HBaseCuboidWriter implements ICuboidWriter {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HBaseCuboidWriter.class);
+
+    private static final int BATCH_PUT_THRESHOLD = 10000;
+
+    private final List<KeyValueCreator> keyValueCreators;
+    private final int nColumns;
+    private final Table hTable;
+    private final CubeDesc cubeDesc;
+    private final CubeSegment cubeSegment;
+    private final Object[] measureValues;
+
+    private List<Put> puts = Lists.newArrayList();
+    private AbstractRowKeyEncoder rowKeyEncoder;
+    private byte[] keybuf;
+
+    public HBaseCuboidWriter(CubeSegment segment, Table hTable) {
+        this.keyValueCreators = Lists.newArrayList();
+        this.cubeSegment = segment;
+        this.cubeDesc = cubeSegment.getCubeDesc();
+        for (HBaseColumnFamilyDesc cfDesc : 
cubeDesc.getHbaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+            }
+        }
+        this.nColumns = keyValueCreators.size();
+        this.hTable = hTable;
+        this.measureValues = new Object[cubeDesc.getMeasures().size()];
+    }
+
+    private byte[] copy(byte[] array, int offset, int length) {
+        byte[] result = new byte[length];
+        System.arraycopy(array, offset, result, 0, length);
+        return result;
+    }
+
+    //TODO:shardingonstreaming
+    private byte[] createKey(Long cuboidId, GTRecord record) {
+        if (rowKeyEncoder == null || rowKeyEncoder.getCuboidID() != cuboidId) {
+            rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment,
+                    Cuboid.findForMandatory(cubeDesc, cuboidId));
+            keybuf = rowKeyEncoder.createBuf();
+        }
+        rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keybuf);
+        return keybuf;
+
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+        byte[] key = createKey(cuboidId, record);
+        final Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
+        final int nDims = cuboid.getColumns().size();
+        final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + 
cubeDesc.getMeasures().size());
+
+        for (int i = 0; i < nColumns; i++) {
+            final Object[] values = record.getValues(bitSet, measureValues);
+            final KeyValue keyValue = keyValueCreators.get(i).create(key, 0, 
key.length, values);
+            final Put put = new Put(copy(key, 0, key.length));
+            byte[] family = copy(keyValue.getFamilyArray(), 
keyValue.getFamilyOffset(), keyValue.getFamilyLength());
+            byte[] qualifier = copy(keyValue.getQualifierArray(), 
keyValue.getQualifierOffset(), keyValue.getQualifierLength());
+            byte[] value = copy(keyValue.getValueArray(), 
keyValue.getValueOffset(), keyValue.getValueLength());
+            put.addColumn(family, qualifier, value);
+            puts.add(put);
+        }
+        if (puts.size() >= BATCH_PUT_THRESHOLD) {
+            flush();
+        }
+    }
+
+    @Override
+    public final void flush() throws IOException {
+        if (!puts.isEmpty()) {
+            long t = System.currentTimeMillis();
+            if (hTable != null) {
+                hTable.put(puts);
+            }
+            logger.info("commit total " + puts.size() + " puts, totally cost:" 
+ (System.currentTimeMillis() - t) + "ms");
+            puts.clear();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        IOUtils.closeQuietly(hTable);
+    }
+
+}
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 292d9d6..b9d5599 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -459,7 +459,7 @@ public class CubeMigrationCLI {
                             value = Bytes.toBytes(valueString);
                         }
                         Put put = new Put(Bytes.toBytes(cubeId));
-                        put.add(family, column, value);
+                        put.addColumn(family, column, value);
                         destAclHtable.put(put);
                     }
                 }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 23ec77f..46363b2 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.Bytes;
@@ -501,7 +500,7 @@ public class DeployCoprocessorCLI {
 
             Matcher keyMatcher;
             Matcher valueMatcher;
-            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : 
tableDescriptor.getValues().entrySet()) {
+            for (Map.Entry<org.apache.hadoop.hbase.util.Bytes, 
org.apache.hadoop.hbase.util.Bytes> e : tableDescriptor.getValues().entrySet()) 
{
                 keyMatcher = 
HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
                 if (!keyMatcher.matches()) {
                     continue;
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index 092023e..0f9466c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -249,7 +249,7 @@ public class ExtendCubeToHybridCLI {
                         value = Bytes.toBytes(valueString);
                     }
                     Put put = new Put(Bytes.toBytes(newCubeId));
-                    put.add(family, column, value);
+                    put.addColumn(family, column, value);
                     aclHtable.put(put);
                 }
             }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
index a317110..3f034cf 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -233,7 +233,7 @@ public class GridTableHBaseBenchmark {
                 byte[] rowkey = Bytes.toBytes(i);
                 Put put = new Put(rowkey);
                 byte[] cell = randomBytes();
-                put.add(CF, QN, cell);
+                put.addColumn(CF, QN, cell);
                 table.put(put);
                 nBytes += cell.length;
                 dot(i, N_ROWS);
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
index bba6745..ff038d1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
@@ -50,7 +50,8 @@ public class PingHBaseCLI {
         if (User.isHBaseSecurityEnabled(hconf)) {
             try {
                 System.out.println("--------------Getting kerberos credential 
for user " + UserGroupInformation.getCurrentUser().getUserName());
-                TokenUtil.obtainAndCacheToken(hconf, 
UserGroupInformation.getCurrentUser());
+                Connection connection = HBaseConnection.get();
+                TokenUtil.obtainAndCacheToken(connection, 
User.create(UserGroupInformation.getCurrentUser()));
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 System.out.println("--------------Error while getting kerberos 
credential for user " + UserGroupInformation.getCurrentUser().getUserName());
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
index eba4a37..ff2ef91 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
@@ -68,13 +68,23 @@ public class CubeHFileMapperTest {
         Pair<RowKeyWritable, KeyValue> p2 = result.get(1);
 
         assertEquals(key, p1.getFirst());
-        assertEquals("cf1", new String(p1.getSecond().getFamily()));
-        assertEquals("usd_amt", new String(p1.getSecond().getQualifier()));
-        assertEquals("35.43", new String(p1.getSecond().getValue()));
+        assertEquals("cf1", new String(copy(p1.getSecond())));
+        assertEquals("usd_amt", new String(copy(p1.getSecond())));
+        assertEquals("35.43", new String(copy(p1.getSecond())));
 
         assertEquals(key, p2.getFirst());
-        assertEquals("cf1", new String(p2.getSecond().getFamily()));
-        assertEquals("item_count", new String(p2.getSecond().getQualifier()));
-        assertEquals("2", new String(p2.getSecond().getValue()));
+        assertEquals("cf1", new String(copy(p2.getSecond())));
+        assertEquals("item_count", new String(copy(p2.getSecond())));
+        assertEquals("2", new String(copy(p2.getSecond())));
+    }
+
+    private byte[] copy(KeyValue kv) {
+        return copy(kv.getFamilyArray(), kv.getFamilyOffset(), 
kv.getFamilyLength());
+    }
+
+    private byte[] copy(byte[] array, int offset, int length) {
+        byte[] result = new byte[length];
+        System.arraycopy(array, offset, result, 0, length);
+        return result;
     }
 }
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java
index 2b8ecae..b77d2cb 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java
@@ -22,8 +22,11 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 
 /**
@@ -89,13 +92,16 @@ public class TestHbaseClient {
         conf.set("hbase.zookeeper.quorum", "hbase_host");
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 
-        HTable table = new HTable(conf, "test1");
+        Connection connection = ConnectionFactory.createConnection(conf);
+
+        Table table = connection.getTable(TableName.valueOf("test1"));
         Put put = new Put(Bytes.toBytes("row1"));
 
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), 
Bytes.toBytes("val1"));
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), 
Bytes.toBytes("val2"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), 
Bytes.toBytes("val1"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), 
Bytes.toBytes("val2"));
 
         table.put(put);
         table.close();
+        connection.close();
     }
 }
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 5426b62..3b95a50 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -174,6 +175,7 @@ public class CubeMigrationCLI extends AbstractApplication {
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         hbaseAdmin = new HBaseAdmin(conf);
+
         hdfsFS = HadoopUtil.getWorkingFileSystem();
         operations = new ArrayList<Opt>();
         copyFilesInMetaStore(cube);
@@ -418,10 +420,10 @@ public class CubeMigrationCLI extends AbstractApplication 
{
             String tableName = (String) opt.params[0];
             System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName);
             HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
+            hbaseAdmin.disableTable(TableName.valueOf(tableName));
             desc.setValue(IRealizationConstants.HTableTag, 
dstConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
+            hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+            hbaseAdmin.enableTable(TableName.valueOf(tableName));
             logger.info("CHANGE_HTABLE_HOST is completed");
             break;
         }
@@ -580,10 +582,10 @@ public class CubeMigrationCLI extends AbstractApplication 
{
         case CHANGE_HTABLE_HOST: {
             String tableName = (String) opt.params[0];
             HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
+            hbaseAdmin.disableTable(TableName.valueOf(tableName));
             desc.setValue(IRealizationConstants.HTableTag, 
srcConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
+            hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+            hbaseAdmin.enableTable(TableName.valueOf(tableName));
             break;
         }
         case COPY_FILE_IN_META: {
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
index 54fbbc0..52bad9d 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
@@ -29,7 +29,9 @@ import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeInstance;
@@ -61,7 +63,8 @@ public class CubeMigrationCheckCLI {
     private static final Option OPTION_CUBE = 
OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The
 name of cube migrated").create("cube");
 
     private KylinConfig dstCfg;
-    private HBaseAdmin hbaseAdmin;
+    private Admin hbaseAdmin;
+    private Connection connection;
 
     private List<String> issueExistHTables;
     private List<String> inconsistentHTables;
@@ -123,6 +126,7 @@ public class CubeMigrationCheckCLI {
         }
         fixInconsistent();
         printIssueExistingHTables();
+        connection.close();
     }
 
     public CubeMigrationCheckCLI(KylinConfig kylinConfig, Boolean isFix) 
throws IOException {
@@ -130,7 +134,8 @@ public class CubeMigrationCheckCLI {
         this.ifFix = isFix;
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        hbaseAdmin = new HBaseAdmin(conf);
+        connection = ConnectionFactory.createConnection(conf);
+        hbaseAdmin = connection.getAdmin();
 
         issueExistHTables = Lists.newArrayList();
         inconsistentHTables = Lists.newArrayList();
@@ -189,10 +194,10 @@ public class CubeMigrationCheckCLI {
                 String[] sepNameList = segFullName.split(",");
                 HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0]));
                 logger.info("Change the host of htable " + sepNameList[0] + 
"belonging to cube " + sepNameList[1] + " from " + 
desc.getValue(IRealizationConstants.HTableTag) + " to " + 
dstCfg.getMetadataUrlPrefix());
-                hbaseAdmin.disableTable(sepNameList[0]);
+                hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0]));
                 desc.setValue(IRealizationConstants.HTableTag, 
dstCfg.getMetadataUrlPrefix());
-                hbaseAdmin.modifyTable(sepNameList[0], desc);
-                hbaseAdmin.enableTable(sepNameList[0]);
+                hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), 
desc);
+                hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0]));
             }
         } else {
             logger.info("------ Inconsistent HTables Needed To Be Fixed 
------");
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
index 9c6cba6..b5a8440 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
@@ -245,7 +245,7 @@ public class ExtendCubeToHybridCLI {
                         value = Bytes.toBytes(valueString);
                     }
                     Put put = new Put(Bytes.toBytes(newCubeId));
-                    put.add(family, column, value);
+                    put.addColumn(family, column, value);
                     aclHtable.put(put);
                 }
             }
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java 
b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 16aa5ff..f6099eb 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -22,6 +22,7 @@ package org.apache.kylin.tool;
  * Created by xiefan on 17-4-20.
  */
 public class StorageCleanupJob {
+
     public static void main(String[] args) throws Exception {
         org.apache.kylin.rest.job.StorageCleanupJob cli = new 
org.apache.kylin.rest.job.StorageCleanupJob();
         cli.execute(args);

Reply via email to