KYLIN-920 & KYLIN-782 Upgrade to HBase 1.1 (with help from murkrishn 
<murkris...@ebay.com>)

Signed-off-by: Li, Yang <yang...@ebay.com>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a871c60
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a871c60
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a871c60

Branch: refs/heads/1.x-HBase1.1.3
Commit: 5a871c607f86d623ae3dc7f42a0c489c6e5d558a
Parents: 0603a19
Author: Yang Li <liy...@apache.org>
Authored: Sun Aug 16 20:22:13 2015 +0800
Committer: Li, Yang <yang...@ebay.com>
Committed: Wed Dec 23 10:19:13 2015 +0800

----------------------------------------------------------------------
 .../common/persistence/HBaseConnection.java     | 254 ++++----
 .../common/persistence/HBaseResourceStore.java  |  31 +-
 .../common/util/HBaseRegionSizeCalculator.java  |  41 +-
 .../kylin/common/util/BasicHadoopTest.java      |  11 +-
 .../kylin/job/cube/GarbageCollectionStep.java   |  22 +-
 .../kylin/job/hadoop/cube/CubeHFileJob.java     |  18 +-
 .../job/hadoop/cube/StorageCleanupJob.java      |  26 +-
 .../kylin/job/hadoop/hbase/CreateHTableJob.java |   8 +-
 .../hadoop/invertedindex/IICreateHFileJob.java  |  22 +-
 .../hadoop/invertedindex/IICreateHTableJob.java |  11 +-
 .../apache/kylin/job/tools/CleanHtableCLI.java  |   8 +-
 .../kylin/job/tools/CubeMigrationCLI.java       |  64 +-
 .../kylin/job/tools/DeployCoprocessorCLI.java   | 625 ++++++++++---------
 .../job/tools/GridTableHBaseBenchmark.java      |  37 +-
 .../kylin/job/tools/HtableAlterMetadataCLI.java |   8 +-
 .../apache/kylin/job/tools/RowCounterCLI.java   |  11 +-
 .../org/apache/kylin/job/ExportHBaseData.java   |  18 +-
 .../kylin/job/hadoop/hbase/TestHbaseClient.java |  13 +-
 .../kylin/job/tools/HBaseRowDigestTest.java     |  11 +-
 monitor/pom.xml                                 |   6 +
 .../kylin/monitor/MonitorMetaManager.java       |  49 +-
 pom.xml                                         |  14 +-
 .../apache/kylin/rest/service/AclService.java   |  38 +-
 .../apache/kylin/rest/service/CubeService.java  |  35 +-
 .../apache/kylin/rest/service/QueryService.java |  21 +-
 .../apache/kylin/rest/service/UserService.java  |  27 +-
 .../storage/filter/BitMapFilterEvaluator.java   |   1 -
 .../storage/hbase/CubeSegmentTupleIterator.java |  19 +-
 .../kylin/storage/hbase/CubeStorageEngine.java  |   4 +-
 .../storage/hbase/HBaseClientKVIterator.java    | 187 +++---
 .../hbase/InvertedIndexStorageEngine.java       | 114 ++--
 .../kylin/storage/hbase/PingHBaseCLI.java       | 179 +++---
 .../storage/hbase/RegionScannerAdapter.java     |  10 +-
 .../hbase/SerializedHBaseTupleIterator.java     |   4 +-
 .../endpoint/EndpointTupleIterator.java         |  15 +-
 .../hbase/coprocessor/endpoint/IIEndpoint.java  |   2 +-
 .../observer/AggregateRegionObserver.java       |   2 +-
 .../observer/AggregationScanner.java            |  14 +-
 .../observer/ObserverAggregationCache.java      |  10 +-
 .../coprocessor/observer/ObserverEnabler.java   |   4 +-
 .../storage/hbase/InvertedIndexHBaseTest.java   | 227 ++++---
 .../observer/AggregateRegionObserverTest.java   |  72 +--
 .../minicluster/HiveMiniClusterTest.java        |   3 +-
 43 files changed, 1151 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java 
b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index 9c86376..3c07654 100644
--- 
a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ 
b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -1,122 +1,132 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.persistence;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- * 
- */
-public class HBaseConnection {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(HBaseConnection.class);
-
-    private static final Map<String, HConnection> ConnPool = new 
ConcurrentHashMap<String, HConnection>();
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                for (HConnection conn : ConnPool.values()) {
-                    try {
-                        conn.close();
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        });
-    }
-    
-    public static void clearCache() {
-        ConnPool.clear();
-    }
-
-    public static HConnection get(String url) {
-        // find configuration
-        Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
-
-        HConnection connection = ConnPool.get(url);
-        try {
-            // I don't use DCL since recreate a connection is not a big issue.
-            if (connection == null) {
-                connection = HConnectionManager.createConnection(conf);
-                ConnPool.put(url, connection);
-            }
-        } catch (Throwable t) {
-            throw new StorageException("Error when open connection " + url, t);
-        }
-
-        return connection;
-    }
-
-    public static void createHTableIfNeeded(String hbaseUrl, String tableName, 
String... families) throws IOException {
-        createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, 
families);
-    }
-
-    public static void createHTableIfNeeded(HConnection conn, String 
tableName, String... families) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
-
-        try {
-            boolean tableExist = false;
-            try {
-                hbase.getTableDescriptor(TableName.valueOf(tableName));
-                tableExist = true;
-            } catch (TableNotFoundException e) {
-            }
-
-            if (tableExist) {
-                logger.debug("HTable '" + tableName + "' already exists");
-                return;
-            }
-
-            logger.debug("Creating HTable '" + tableName + "'");
-
-            HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(tableName));
-
-            if (null != families && families.length > 0) {
-                for (String family : families) {
-                    HColumnDescriptor fd = new HColumnDescriptor(family);
-                    fd.setInMemory(true); // metadata tables are best in memory
-                    desc.addFamily(fd);
-                }
-            }
-            hbase.createTable(desc);
-
-            logger.debug("HTable '" + tableName + "' created");
-        } finally {
-            hbase.close();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HBaseConnection {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HBaseConnection.class);
+
+    private static final Map<String, Configuration> ConfigCache = new 
ConcurrentHashMap<String, Configuration>();
+    private static final Map<String, Connection> ConnPool = new 
ConcurrentHashMap<String, Connection>();
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                for (Connection conn : ConnPool.values()) {
+                    try {
+                        conn.close();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        });
+    }
+    
+    public static void clearCache() {
+        ConnPool.clear();
+    }
+
+    public static Connection get() {
+        return get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+    }
+
+    public static Connection get(String url) {
+        // find configuration
+        Configuration conf = ConfigCache.get(url);
+        if (conf == null) {
+            conf = HadoopUtil.newHBaseConfiguration(url);
+            ConfigCache.put(url, conf);
+        }
+
+        Connection connection = ConnPool.get(url);
+        try {
+            // I don't use DCL since recreate a connection is not a big issue.
+            if (connection == null) {
+                connection = ConnectionFactory.createConnection(conf);
+                ConnPool.put(url, connection);
+            }
+        } catch (Throwable t) {
+            throw new StorageException("Error when open connection " + url, t);
+        }
+
+        return connection;
+    }
+
+    public static void createHTableIfNeeded(String hbaseUrl, String tableName, 
String... families) throws IOException {
+        createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, 
families);
+    }
+
+    public static void createHTableIfNeeded(Connection conn, String tableName, 
String... families) throws IOException {
+        Admin admin = conn.getAdmin();
+
+        try {
+            boolean tableExist = false;
+            try {
+                admin.getTableDescriptor(TableName.valueOf(tableName));
+                tableExist = true;
+            } catch (TableNotFoundException e) {
+            }
+
+            if (tableExist) {
+                logger.debug("HTable '" + tableName + "' already exists");
+                return;
+            }
+
+            logger.debug("Creating HTable '" + tableName + "'");
+
+            HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(tableName));
+
+            if (null != families && families.length > 0) {
+                for (String family : families) {
+                    HColumnDescriptor fd = new HColumnDescriptor(family);
+                    fd.setInMemory(true); // metadata tables are best in memory
+                    desc.addFamily(fd);
+                }
+            }
+            admin.createTable(desc);
+
+            logger.debug("HTable '" + tableName + "' created");
+        } finally {
+            admin.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
 
b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index ac14e7b..a3bb6e3 100644
--- 
a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ 
b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -34,13 +34,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -77,7 +78,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     //    final Map<String, String> tableNameMap; // path prefix ==> HBase 
table name
 
-    private HConnection getConnection() throws IOException {
+    private Connection getConnection() throws IOException {
         return HBaseConnection.get(hbaseUrl);
     }
 
@@ -114,7 +115,7 @@ public class HBaseResourceStore extends ResourceStore {
 
         ArrayList<String> result = new ArrayList<String>();
 
-        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
+        Table table = 
getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         Scan scan = new Scan(startRow, endRow);
         scan.setFilter(new KeyOnlyFilter());
         try {
@@ -149,7 +150,7 @@ public class HBaseResourceStore extends ResourceStore {
         scan.addColumn(B_FAMILY, B_COLUMN_TS);
         scan.addColumn(B_FAMILY, B_COLUMN);
 
-        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
+        Table table = 
getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         List<RawResource> result = Lists.newArrayList();
         try {
             ResultScanner scanner = table.getScanner(scan);
@@ -211,13 +212,12 @@ public class HBaseResourceStore extends ResourceStore {
         IOUtils.copy(content, bout);
         bout.close();
 
-        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
+        Table table = 
getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             byte[] row = Bytes.toBytes(resPath);
             Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
 
             table.put(put);
-            table.flushCommits();
         } finally {
             IOUtils.closeQuietly(table);
         }
@@ -225,7 +225,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected long checkAndPutResourceImpl(String resPath, byte[] content, 
long oldTS, long newTS) throws IOException, IllegalStateException {
-        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
+        Table table = 
getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             byte[] row = Bytes.toBytes(resPath);
             byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -237,8 +237,6 @@ public class HBaseResourceStore extends ResourceStore {
                 throw new IllegalStateException("Overwriting conflict " + 
resPath + ", expect old TS " + real + ", but it is " + oldTS);
             }
 
-            table.flushCommits();
-
             return newTS;
         } finally {
             IOUtils.closeQuietly(table);
@@ -247,11 +245,10 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
+        Table table = 
getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             Delete del = new Delete(Bytes.toBytes(resPath));
             table.delete(del);
-            table.flushCommits();
         } finally {
             IOUtils.closeQuietly(table);
         }
@@ -276,7 +273,7 @@ public class HBaseResourceStore extends ResourceStore {
                 scan.addColumn(B_FAMILY, B_COLUMN_TS);
         }
 
-        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
+        Table table = 
getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             ResultScanner scanner = table.getScanner(scan);
             Result result = null;
@@ -295,7 +292,7 @@ public class HBaseResourceStore extends ResourceStore {
         return endRow;
     }
 
-    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, 
HTableInterface table) throws IOException {
+    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, 
Table table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
         Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
@@ -321,7 +318,7 @@ public class HBaseResourceStore extends ResourceStore {
         return redirectPath;
     }
 
-    private Put buildPut(String resPath, long ts, byte[] row, byte[] content, 
HTableInterface table) throws IOException {
+    private Put buildPut(String resPath, long ts, byte[] row, byte[] content, 
Table table) throws IOException {
         int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
         if (content.length > kvSizeLimit) {
             writeLargeCellToHdfs(resPath, content, table);
@@ -329,8 +326,8 @@ public class HBaseResourceStore extends ResourceStore {
         }
 
         Put put = new Put(row);
-        put.add(B_FAMILY, B_COLUMN, content);
-        put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+        put.addColumn(B_FAMILY, B_COLUMN, content);
+        put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
 
         return put;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
 
b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
index 093ac9e..ccbb6f0 100644
--- 
a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
+++ 
b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
@@ -23,19 +23,24 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,30 +58,31 @@ public class HBaseRegionSizeCalculator {
     /**
      * Computes size of each region for table and given column families.
      * */
-    public HBaseRegionSizeCalculator(HTable table) throws IOException {
-        this(table, new HBaseAdmin(table.getConfiguration()));
-    }
-
-    /** Constructor for unit testing */
-    HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws 
IOException {
-
+    public HBaseRegionSizeCalculator(String tableName , Connection 
hbaseConnection) throws IOException {
+        Table table = null;
+        Admin admin = null;
+        
         try {
+            table = hbaseConnection.getTable(TableName.valueOf(tableName));
+            admin = hbaseConnection.getAdmin();
+            
             if (!enabled(table.getConfiguration())) {
                 logger.info("Region size calculation disabled.");
                 return;
             }
 
-            logger.info("Calculating region sizes for table \"" + new 
String(table.getTableName()) + "\".");
+            logger.info("Calculating region sizes for table \"" + 
table.getName() + "\".");
 
             // Get regions for table.
-            Set<HRegionInfo> tableRegionInfos = 
table.getRegionLocations().keySet();
+            RegionLocator regionLocator = 
hbaseConnection.getRegionLocator(table.getName());
+            List<HRegionLocation> regionLocationList = 
regionLocator.getAllRegionLocations();
             Set<byte[]> tableRegions = new 
TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
 
-            for (HRegionInfo regionInfo : tableRegionInfos) {
-                tableRegions.add(regionInfo.getRegionName());
+            for (HRegionLocation hRegionLocation : regionLocationList) {
+                
tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
             }
 
-            ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+            ClusterStatus clusterStatus = admin.getClusterStatus();
             Collection<ServerName> servers = clusterStatus.getServers();
             final long megaByte = 1024L * 1024L;
 
@@ -99,7 +105,8 @@ public class HBaseRegionSizeCalculator {
                 }
             }
         } finally {
-            hBaseAdmin.close();
+            IOUtils.closeQuietly(table);
+            IOUtils.closeQuietly(admin);
         }
 
     }
@@ -124,4 +131,4 @@ public class HBaseRegionSizeCalculator {
     public Map<byte[], Long> getRegionSizeMap() {
         return Collections.unmodifiableMap(sizeMap);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java 
b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
index 6d2762c..481fc6c 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
@@ -21,12 +21,11 @@ package org.apache.kylin.common.util;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -56,16 +55,14 @@ public class BasicHadoopTest {
         cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
         tableDesc.addFamily(cf);
 
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin admin = new HBaseAdmin(conf);
+        Admin admin = HBaseConnection.get().getAdmin();
         admin.createTable(tableDesc);
         admin.close();
     }
 
     @Test
     public void testRetriveHtableHost() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables();
         for (HTableDescriptor table : tableDescriptors) {
             String value = table.getValue("KYLIN_HOST");

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java 
b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index f2f1fc0..8c61a3a 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -24,14 +24,13 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.cmd.ShellCmdOutput;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -99,19 +98,18 @@ public class GarbageCollectionStep extends 
AbstractExecutable {
         List<String> oldTables = getOldHTables();
         if (oldTables != null && oldTables.size() > 0) {
             String metadataUrlPrefix = 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConfiguration.create();
-            HBaseAdmin admin = null;
+            Admin admin = null;
             try {
-                admin = new HBaseAdmin(conf);
+                admin = HBaseConnection.get().getAdmin();
                 for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = 
admin.getTableDescriptor(Bytes.toBytes(table));
+                    if (admin.tableExists(TableName.valueOf(table))) {
+                        HTableDescriptor tableDescriptor = 
admin.getTableDescriptor(TableName.valueOf(table));
                         String host = 
tableDescriptor.getValue(IRealizationConstants.HTableTag);
                         if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
+                            if 
(admin.isTableEnabled(TableName.valueOf(table))) {
+                                admin.disableTable(TableName.valueOf(table));
                             }
-                            admin.deleteTable(table);
+                            admin.deleteTable(TableName.valueOf(table));
                             logger.debug("Dropped HBase table " + table);
                             output.append("Dropped HBase table " + table + " 
\n");
                         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index 3c1e4a5..6f36eff 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -19,11 +19,15 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.mapreduce.Job;
@@ -31,6 +35,7 @@ import 
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.constant.BatchConstants;
@@ -47,6 +52,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
     public int run(String[] args) throws Exception {
         Options options = new Options();
+        Connection connection = null;
+        Table table = null;
 
         try {
             options.addOption(OPTION_JOB_NAME);
@@ -80,10 +87,12 @@ public class CubeHFileJob extends AbstractHadoopJob {
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
             String tableName = 
getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-            HTable htable = new HTable(conf, tableName);
+            connection = HBaseConnection.get();
+            table = connection.getTable(TableName.valueOf(tableName));
+            RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(tableName));
 
             //Automatic config !
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
+            HFileOutputFormat2.configureIncrementalLoad(job, table, 
regionLocator);
 
             // set block replication to 3 for hfiles
             conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
@@ -96,6 +105,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             printUsage(options);
             throw e;
         } finally {
+            IOUtils.closeQuietly(table);
             if (job != null)
                 cleanupTempConfFile(job.getConfiguration());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index 3b25ee1..184b6cd 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -18,6 +18,13 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -28,10 +35,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -50,13 +59,6 @@ import 
org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * @author ysong1
  */
@@ -107,7 +109,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
         IIManager iiManager = 
IIManager.getInstance(KylinConfig.getInstanceFromEnv());
 
         // get all kylin hbase tables
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
         String tableNamePrefix = 
IRealizationConstants.SharedHbaseStorageLocationPrefix;
         HTableDescriptor[] tableDescriptors = 
hbaseAdmin.listTables(tableNamePrefix + ".*");
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -141,9 +143,9 @@ public class StorageCleanupJob extends AbstractHadoopJob {
             // drop tables
             for (String htableName : allTablesNeedToBeDropped) {
                 log.info("Deleting HBase table " + htableName);
-                if (hbaseAdmin.tableExists(htableName)) {
-                    hbaseAdmin.disableTable(htableName);
-                    hbaseAdmin.deleteTable(htableName);
+                if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+                    hbaseAdmin.disableTable(TableName.valueOf(htableName));
+                    hbaseAdmin.deleteTable(TableName.valueOf(htableName));
                     log.info("Deleted HBase table " + htableName);
                 } else {
                     log.info("HBase table" + htableName + " does not exist");

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index 027c0ca..9f5e062 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -25,11 +25,10 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
@@ -42,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -81,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         tableDesc.setValue(IRealizationConstants.HTableTag, 
config.getMetadataUrlPrefix());
 
         Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
-        HBaseAdmin admin = new HBaseAdmin(conf);
+        Admin admin = HBaseConnection.get().getAdmin();
 
         try {
             if (User.isHBaseSecurityEnabled(conf)) {
@@ -139,7 +139,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
             byte[][] splitKeys = getSplits(conf, partitionFilePath);
 
-            if (admin.tableExists(tableName)) {
+            if (admin.tableExists(TableName.valueOf(tableName))) {
                 // admin.disableTable(tableName);
                 // admin.deleteTable(tableName);
                 throw new RuntimeException("HBase table " + tableName + " 
exists!");

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
 
b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index c032bbc..fa42148 100644
--- 
a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ 
b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -19,17 +19,20 @@
 package org.apache.kylin.job.hadoop.invertedindex;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
@@ -45,6 +48,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
 
     public int run(String[] args) throws Exception {
         Options options = new Options();
+        Connection connection = null;
+        Table table = null;
 
         try {
             options.addOption(OPTION_JOB_NAME);
@@ -69,8 +74,11 @@ public class IICreateHFileJob extends AbstractHadoopJob {
             job.setMapOutputValueClass(KeyValue.class);
 
             String tableName = getOptionValue(OPTION_HTABLE_NAME);
-            HTable htable = new HTable(HBaseConfiguration.create(getConf()), 
tableName);
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            connection = HBaseConnection.get();
+            table = connection.getTable(TableName.valueOf(tableName));
+            RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(tableName));
+            HFileOutputFormat2.configureIncrementalLoad(job, table, 
regionLocator);
 
             this.deletePath(job.getConfiguration(), output);
 
@@ -78,6 +86,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
         } catch (Exception e) {
             printUsage(options);
             throw e;
+        } finally {
+            IOUtils.closeQuietly(table);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
 
b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
index 32d065a..63777ef 100644
--- 
a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
+++ 
b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
@@ -78,10 +79,10 @@ public class IICreateHTableJob extends AbstractHadoopJob {
             DeployCoprocessorCLI.deployCoprocessor(tableDesc);
 
             // drop the table first
-            HBaseAdmin admin = new HBaseAdmin(conf);
-            if (admin.tableExists(tableName)) {
-                admin.disableTable(tableName);
-                admin.deleteTable(tableName);
+            Admin admin = HBaseConnection.get().getAdmin();
+            if (admin.tableExists(TableName.valueOf(tableName))) {
+                admin.disableTable(TableName.valueOf(tableName));
+                admin.deleteTable(TableName.valueOf(tableName));
             }
 
             // create table

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java 
b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
index b6e5af5..7fc1d72 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
@@ -21,11 +21,10 @@ package org.apache.kylin.job.tools;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,8 +53,7 @@ public class CleanHtableCLI extends AbstractHadoopJob {
     }
 
     private void clean() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
 
         for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
             String name = descriptor.getNameAsString().toLowerCase();

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java 
b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index 962a4ee..72680c9 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -18,14 +18,31 @@
 
 package org.apache.kylin.job.tools;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.*;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -46,11 +63,6 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Created by honma on 9/3/14.
  * <p/>
@@ -70,7 +82,7 @@ public class CubeMigrationCLI {
     private static ResourceStore srcStore;
     private static ResourceStore dstStore;
     private static FileSystem hdfsFS;
-    private static HBaseAdmin hbaseAdmin;
+    private static Admin hbaseAdmin;
 
     public static void main(String[] args) throws IOException, 
InterruptedException {
 
@@ -110,8 +122,7 @@ public class CubeMigrationCLI {
 
         checkAndGetHbaseUrl();
 
-        Configuration conf = HBaseConfiguration.create();
-        hbaseAdmin = new HBaseAdmin(conf);
+        hbaseAdmin = HBaseConnection.get().getAdmin();
 
         hdfsFS = FileSystem.get(new Configuration());
 
@@ -131,6 +142,8 @@ public class CubeMigrationCLI {
         } else {
             showOpts();
         }
+
+        IOUtils.closeQuietly(hbaseAdmin);
     }
 
     public static void moveCube(String srcCfgUri, String dstCfgUri, String 
cubeName, String projectName, String copyAcl, String overwriteIfExists, String 
realExecute) throws IOException, InterruptedException {
@@ -294,10 +307,10 @@ public class CubeMigrationCLI {
         case CHANGE_HTABLE_HOST: {
             String tableName = (String) opt.params[0];
             HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
+            hbaseAdmin.disableTable(TableName.valueOf(tableName));
             desc.setValue(IRealizationConstants.HTableTag, 
dstConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
+            hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+            hbaseAdmin.enableTable(TableName.valueOf(tableName));
             logger.info("CHANGE_HTABLE_HOST is completed");
             break;
         }
@@ -402,20 +415,19 @@ public class CubeMigrationCLI {
         }
         case COPY_ACL: {
             String cubeId = (String) opt.params[0];
-            HTableInterface srcAclHtable = null;
-            HTableInterface destAclHtable = null;
+            Table srcAclHtable = null;
+            Table destAclHtable = null;
             try {
-                srcAclHtable = 
HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(srcConfig.getMetadataUrlPrefix()
 + "_acl");
-                destAclHtable = 
HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix()
 + "_acl");
+                srcAclHtable = 
HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix()
 + "_acl"));
+                destAclHtable = 
HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix()
 + "_acl"));
 
                 Result result = srcAclHtable.get(new 
Get(Bytes.toBytes(cubeId)));
 
                 for (Cell cell : result.listCells()) {
                     Put put = new Put(Bytes.toBytes(cubeId));
-                    put.add(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
+                    put.addColumn(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
                     destAclHtable.put(put);
                 }
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(srcAclHtable);
                 IOUtils.closeQuietly(destAclHtable);
@@ -442,10 +454,10 @@ public class CubeMigrationCLI {
         case CHANGE_HTABLE_HOST: {
             String tableName = (String) opt.params[0];
             HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
+            hbaseAdmin.disableTable(TableName.valueOf(tableName));
             desc.setValue(IRealizationConstants.HTableTag, 
srcConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
+            hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+            hbaseAdmin.enableTable(TableName.valueOf(tableName));
             break;
         }
         case COPY_FILE_IN_META: {
@@ -474,12 +486,10 @@ public class CubeMigrationCLI {
         }
         case COPY_ACL: {
             String cubeId = (String) opt.params[0];
-            HTableInterface destAclHtable = null;
+            Table destAclHtable = null;
             try {
-                destAclHtable = 
HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix()
 + "_acl");
-
+                destAclHtable = 
HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix()
 + "_acl"));
                 destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(destAclHtable);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java 
b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index 5482684..239c7ec 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -1,313 +1,314 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class DeployCoprocessorCLI {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
-    public static final String OBSERVER_CLS_NAME = 
"org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
-    public static final String ENDPOINT_CLS_NAMAE = 
"org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
-
-    public static void main(String[] args) throws IOException {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
-        String localCoprocessorJar = new File(args[0]).getAbsolutePath();
-        logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
-        List<String> tableNames = getHTableNames(kylinConfig);
-        logger.info("Identify tables " + tableNames);
-
-        Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, 
tableNames);
-        logger.info("Old coprocessor jar: " + oldJarPaths);
-
-        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, 
fileSystem, oldJarPaths);
-        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
-        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, 
hdfsCoprocessorJar, tableNames);
-
-        // Don't remove old jars, missing coprocessor jar will fail hbase
-        // removeOldJars(oldJarPaths, fileSystem);
-
-        hbaseAdmin.close();
-
-        logger.info("Processed " + processedTables);
-        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
-    }
-
-    public static void deployCoprocessor(HTableDescriptor tableDesc) {
-        try {
-            initHTableCoprocessor(tableDesc);
-            logger.info("hbase table " + tableDesc.getName() + " deployed with 
coprocessor.");
-
-        } catch (Exception ex) {
-            logger.error("Error deploying coprocessor on " + 
tableDesc.getName(), ex);
-            logger.error("Will try creating the table without coprocessor.");
-        }
-    }
-
-    private static void initHTableCoprocessor(HTableDescriptor desc) throws 
IOException {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class DeployCoprocessorCLI {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+    public static final String OBSERVER_CLS_NAME = 
"org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+    public static final String ENDPOINT_CLS_NAMAE = 
"org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+
+    public static void main(String[] args) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
-
-        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
-        Path hdfsCoprocessorJar = 
DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, 
null);
-
-        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
-    }
-
-    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path 
hdfsCoprocessorJar) throws IOException {
-        logger.info("Add coprocessor on " + desc.getNameAsString());
-        desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, 
null);
-        desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
-    }
-
-    public static void resetCoprocessor(String tableName, HBaseAdmin 
hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
-        logger.info("Disable " + tableName);
-        hbaseAdmin.disableTable(tableName);
-
-        logger.info("Unset coprocessor on " + tableName);
-        HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-        while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
-            desc.removeCoprocessor(OBSERVER_CLS_NAME);
-        }
-        while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
-            desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
-        }
-
-        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
-        hbaseAdmin.modifyTable(tableName, desc);
-
-        logger.info("Enable " + tableName);
-        hbaseAdmin.enableTable(tableName);
-    }
-
-    private static List<String> resetCoprocessorOnHTables(HBaseAdmin 
hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws 
IOException {
-        List<String> processed = new ArrayList<String>();
-
-        for (String tableName : tableNames) {
-            try {
-                resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
-                processed.add(tableName);
-            } catch (IOException ex) {
-                logger.error("Error processing " + tableName, ex);
-            }
-        }
-        return processed;
-    }
-
-    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem 
fileSystem) throws IOException {
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
-        FileStatus newestJar = null;
-        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
-            if (fileStatus.getPath().toString().endsWith(".jar")) {
-                if (newestJar == null) {
-                    newestJar = fileStatus;
-                } else {
-                    if (newestJar.getModificationTime() < 
fileStatus.getModificationTime())
-                        newestJar = fileStatus;
-                }
-            }
-        }
-        if (newestJar == null)
-            return null;
-
-        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), 
null);
-        logger.info("The newest coprocessor is " + path.toString());
-        return path;
-    }
-
-    public static Path uploadCoprocessorJar(String localCoprocessorJar, 
FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
-        Path uploadPath = null;
-        File localCoprocessorFile = new File(localCoprocessorJar);
-
-        // check existing jars
-        if (oldJarPaths == null) {
-            oldJarPaths = new HashSet<String>();
-        }
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, 
KylinConfig.getInstanceFromEnv());
-        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
-            if (fileStatus.getLen() == localCoprocessorJar.length() && 
fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
-                uploadPath = fileStatus.getPath();
-                break;
-            }
-            String filename = fileStatus.getPath().toString();
-            if (filename.endsWith(".jar")) {
-                oldJarPaths.add(filename);
-            }
-        }
-
-        // upload if not existing
-        if (uploadPath == null) {
-            // figure out a unique new jar file name
-            Set<String> oldJarNames = new HashSet<String>();
-            for (String path : oldJarPaths) {
-                oldJarNames.add(new Path(path).getName());
-            }
-            String baseName = getBaseFileName(localCoprocessorJar);
-            String newName = null;
-            int i = 0;
-            while (newName == null) {
-                newName = baseName + "-" + (i++) + ".jar";
-                if (oldJarNames.contains(newName))
-                    newName = null;
-            }
-
-            // upload
-            uploadPath = new Path(coprocessorDir, newName);
-            FileInputStream in = null;
-            FSDataOutputStream out = null;
-            try {
-                in = new FileInputStream(localCoprocessorFile);
-                out = fileSystem.create(uploadPath);
-                IOUtils.copy(in, out);
-            } finally {
-                IOUtils.closeQuietly(in);
-                IOUtils.closeQuietly(out);
-            }
-
-            fileSystem.setTimes(uploadPath, 
localCoprocessorFile.lastModified(), -1);
-
-        }
-
-        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
-        return uploadPath;
-    }
-
-    private static String getBaseFileName(String localCoprocessorJar) {
-        File localJar = new File(localCoprocessorJar);
-        String baseName = localJar.getName();
-        if (baseName.endsWith(".jar"))
-            baseName = baseName.substring(0, baseName.length() - 
".jar".length());
-        return baseName;
-    }
-
-    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, 
KylinConfig config) throws IOException {
-        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
-        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
-        fileSystem.mkdirs(coprocessorDir);
-        return coprocessorDir;
-    }
-
-    private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, 
List<String> tableNames) throws IOException {
-        HashSet<String> result = new HashSet<String>();
-
-        for (String tableName : tableNames) {
-            HTableDescriptor tableDescriptor = null;
-            try {
-                tableDescriptor = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            } catch (TableNotFoundException e) {
-                logger.warn("Table not found " + tableName, e);
-                continue;
-            }
-
-            Matcher keyMatcher;
-            Matcher valueMatcher;
-            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : 
tableDescriptor.getValues().entrySet()) {
-                keyMatcher = 
HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
-                if (!keyMatcher.matches()) {
-                    continue;
-                }
-                valueMatcher = 
HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
-                if (!valueMatcher.matches()) {
-                    continue;
-                }
-
-                String jarPath = valueMatcher.group(1).trim();
-                String clsName = valueMatcher.group(2).trim();
-
-                if (OBSERVER_CLS_NAME.equals(clsName)) {
-                    result.add(jarPath);
-                }
-            }
-        }
-
-        return result;
-    }
-
-    private static List<String> getHTableNames(KylinConfig config) {
-        CubeManager cubeMgr = CubeManager.getInstance(config);
-
-        ArrayList<String> result = new ArrayList<String>();
-        for (CubeInstance cube : cubeMgr.listAllCubes()) {
-            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
-                String tableName = seg.getStorageLocationIdentifier();
-                if (StringUtils.isBlank(tableName) == false) {
-                    result.add(tableName);
-                    System.out.println("added new table: " + tableName);
-                }
-            }
-        }
-
-        for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
-            for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
-                String tableName = seg.getStorageLocationIdentifier();
-                if (StringUtils.isBlank(tableName) == false) {
-                    result.add(tableName);
-                    System.out.println("added new table: " + tableName);
-                }
-            }
-        }
-
-        return result;
-    }
-}
+        FileSystem fileSystem = FileSystem.get(hconf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
+
+        String localCoprocessorJar = new File(args[0]).getAbsolutePath();
+        logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+        List<String> tableNames = getHTableNames(kylinConfig);
+        logger.info("Identify tables " + tableNames);
+
+        Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, 
tableNames);
+        logger.info("Old coprocessor jar: " + oldJarPaths);
+
+        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, 
fileSystem, oldJarPaths);
+        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, 
hdfsCoprocessorJar, tableNames);
+
+        // Don't remove old jars, missing coprocessor jar will fail hbase
+        // removeOldJars(oldJarPaths, fileSystem);
+
+        hbaseAdmin.close();
+
+        logger.info("Processed " + processedTables);
+        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+    }
+
+    public static void deployCoprocessor(HTableDescriptor tableDesc) {
+        try {
+            initHTableCoprocessor(tableDesc);
+            logger.info("hbase table " + tableDesc.getTableName() + " deployed 
with coprocessor.");
+
+        } catch (Exception ex) {
+            logger.error("Error deploying coprocessor on " + 
tableDesc.getTableName(), ex);
+            logger.error("Will try creating the table without coprocessor.");
+        }
+    }
+
+    private static void initHTableCoprocessor(HTableDescriptor desc) throws 
IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        FileSystem fileSystem = FileSystem.get(hconf);
+
+        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+        Path hdfsCoprocessorJar = 
DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, 
null);
+
+        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+    }
+
+    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path 
hdfsCoprocessorJar) throws IOException {
+        logger.info("Add coprocessor on " + desc.getNameAsString());
+        desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, 
null);
+        desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
+    }
+
+    public static void resetCoprocessor(String tableName, Admin hbaseAdmin, 
Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Disable " + tableName);
+        hbaseAdmin.disableTable(TableName.valueOf(tableName));
+
+        logger.info("Unset coprocessor on " + tableName);
+        HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+        while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
+            desc.removeCoprocessor(OBSERVER_CLS_NAME);
+        }
+        while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
+            desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
+        }
+
+        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+        hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+
+        logger.info("Enable " + tableName);
+        hbaseAdmin.enableTable(TableName.valueOf(tableName));
+    }
+
+    private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, 
Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+        List<String> processed = new ArrayList<String>();
+
+        for (String tableName : tableNames) {
+            try {
+                resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+                processed.add(tableName);
+            } catch (IOException ex) {
+                logger.error("Error processing " + tableName, ex);
+            }
+        }
+        return processed;
+    }
+
+    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem 
fileSystem) throws IOException {
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+        FileStatus newestJar = null;
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (fileStatus.getPath().toString().endsWith(".jar")) {
+                if (newestJar == null) {
+                    newestJar = fileStatus;
+                } else {
+                    if (newestJar.getModificationTime() < 
fileStatus.getModificationTime())
+                        newestJar = fileStatus;
+                }
+            }
+        }
+        if (newestJar == null)
+            return null;
+
+        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), 
null);
+        logger.info("The newest coprocessor is " + path.toString());
+        return path;
+    }
+
+    public static Path uploadCoprocessorJar(String localCoprocessorJar, 
FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+        Path uploadPath = null;
+        File localCoprocessorFile = new File(localCoprocessorJar);
+
+        // check existing jars
+        if (oldJarPaths == null) {
+            oldJarPaths = new HashSet<String>();
+        }
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, 
KylinConfig.getInstanceFromEnv());
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (fileStatus.getLen() == localCoprocessorJar.length() && 
fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
+                uploadPath = fileStatus.getPath();
+                break;
+            }
+            String filename = fileStatus.getPath().toString();
+            if (filename.endsWith(".jar")) {
+                oldJarPaths.add(filename);
+            }
+        }
+
+        // upload if not existing
+        if (uploadPath == null) {
+            // figure out a unique new jar file name
+            Set<String> oldJarNames = new HashSet<String>();
+            for (String path : oldJarPaths) {
+                oldJarNames.add(new Path(path).getName());
+            }
+            String baseName = getBaseFileName(localCoprocessorJar);
+            String newName = null;
+            int i = 0;
+            while (newName == null) {
+                newName = baseName + "-" + (i++) + ".jar";
+                if (oldJarNames.contains(newName))
+                    newName = null;
+            }
+
+            // upload
+            uploadPath = new Path(coprocessorDir, newName);
+            FileInputStream in = null;
+            FSDataOutputStream out = null;
+            try {
+                in = new FileInputStream(localCoprocessorFile);
+                out = fileSystem.create(uploadPath);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
+            }
+
+            fileSystem.setTimes(uploadPath, 
localCoprocessorFile.lastModified(), -1);
+
+        }
+
+        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+        return uploadPath;
+    }
+
+    private static String getBaseFileName(String localCoprocessorJar) {
+        File localJar = new File(localCoprocessorJar);
+        String baseName = localJar.getName();
+        if (baseName.endsWith(".jar"))
+            baseName = baseName.substring(0, baseName.length() - 
".jar".length());
+        return baseName;
+    }
+
+    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, 
KylinConfig config) throws IOException {
+        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+        fileSystem.mkdirs(coprocessorDir);
+        return coprocessorDir;
+    }
+
+    private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, 
List<String> tableNames) throws IOException {
+        HashSet<String> result = new HashSet<String>();
+
+        for (String tableName : tableNames) {
+            HTableDescriptor tableDescriptor = null;
+            try {
+                tableDescriptor = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            } catch (TableNotFoundException e) {
+                logger.warn("Table not found " + tableName, e);
+                continue;
+            }
+
+            Matcher keyMatcher;
+            Matcher valueMatcher;
+            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : 
tableDescriptor.getValues().entrySet()) {
+                keyMatcher = 
HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+                if (!keyMatcher.matches()) {
+                    continue;
+                }
+                valueMatcher = 
HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+                if (!valueMatcher.matches()) {
+                    continue;
+                }
+
+                String jarPath = valueMatcher.group(1).trim();
+                String clsName = valueMatcher.group(2).trim();
+
+                if (OBSERVER_CLS_NAME.equals(clsName)) {
+                    result.add(jarPath);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static List<String> getHTableNames(KylinConfig config) {
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+
+        ArrayList<String> result = new ArrayList<String>();
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+                String tableName = seg.getStorageLocationIdentifier();
+                if (StringUtils.isBlank(tableName) == false) {
+                    result.add(tableName);
+                    System.out.println("added new table: " + tableName);
+                }
+            }
+        }
+
+        for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
+            for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
+                String tableName = seg.getStorageLocationIdentifier();
+                if (StringUtils.isBlank(tableName) == false) {
+                    result.add(tableName);
+                    System.out.println("added new table: " + tableName);
+                }
+            }
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java 
b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
index 70e1df6..5fe5e58 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
@@ -74,8 +74,7 @@ public class GridTableHBaseBenchmark {
     public static void testGridTable(double hitRatio, double indexRatio) 
throws IOException {
         System.out.println("Testing grid table scanning, hit ratio " + 
hitRatio + ", index ratio " + indexRatio);
         String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-
-        HConnection conn = HBaseConnection.get(hbaseUrl);
+        Connection conn = HBaseConnection.get(hbaseUrl);
         createHTableIfNeeded(conn, TEST_TABLE);
         prepareData(conn);
 
@@ -91,10 +90,10 @@ public class GridTableHBaseBenchmark {
 
     }
 
-    private static void testColumnScan(HConnection conn, List<Pair<Integer, 
Integer>> colScans) throws IOException {
+    private static void testColumnScan(Connection conn, List<Pair<Integer, 
Integer>> colScans) throws IOException {
         Stats stats = new Stats("COLUMN_SCAN");
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -122,20 +121,20 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] 
hits) throws IOException {
+    private static void testRowScanNoIndexFullScan(Connection conn, boolean[] 
hits) throws IOException {
         fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
     }
 
-    private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] 
hits) throws IOException {
+    private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] 
hits) throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
     }
 
-    private static void testRowScanWithIndex(HConnection conn, boolean[] hits) 
throws IOException {
+    private static void testRowScanWithIndex(Connection conn, boolean[] hits) 
throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
     }
 
-    private static void fullScan(HConnection conn, boolean[] hits, Stats 
stats) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void fullScan(Connection conn, boolean[] hits, Stats stats) 
throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -156,11 +155,11 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void jumpScan(HConnection conn, boolean[] hits, Stats 
stats) throws IOException {
+    private static void jumpScan(Connection conn, boolean[] hits, Stats stats) 
throws IOException {
 
         final int jumpThreshold = 6; // compensate for Scan() overhead, 
totally by experience
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
 
             stats.markStart();
@@ -204,8 +203,8 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void prepareData(HConnection conn) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void prepareData(Connection conn) throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
 
         try {
             // check how many rows existing
@@ -232,7 +231,7 @@ public class GridTableHBaseBenchmark {
                 byte[] rowkey = Bytes.toBytes(i);
                 Put put = new Put(rowkey);
                 byte[] cell = randomBytes();
-                put.add(CF, QN, cell);
+                put.addColumn(CF, QN, cell);
                 table.put(put);
                 nBytes += cell.length;
                 dot(i, N_ROWS);
@@ -258,8 +257,8 @@ public class GridTableHBaseBenchmark {
         return bytes;
     }
 
-    private static void createHTableIfNeeded(HConnection conn, String 
tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
+    private static void createHTableIfNeeded(Connection conn, String 
tableName) throws IOException {
+        Admin hbase = conn.getAdmin();
 
         try {
             boolean tableExist = false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java 
b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
index 53930e3..e283748 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
@@ -23,12 +23,11 @@ import java.io.IOException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,8 +70,7 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob 
{
     }
 
     private void alter() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
         HTableDescriptor table = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
 
         hbaseAdmin.disableTable(table.getTableName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java 
b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
index 3329d27..4d44088 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
@@ -22,11 +22,12 @@ import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.slf4j.Logger;
@@ -69,8 +70,8 @@ public class RowCounterCLI {
 
         logger.info("My Scan " + scan.toString());
 
-        HConnection conn = HConnectionManager.createConnection(conf);
-        HTableInterface tableInterface = conn.getTable(htableName);
+        Connection conn = ConnectionFactory.createConnection(conf);
+        Table tableInterface = conn.getTable(TableName.valueOf(htableName));
 
         Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
         int counter = 0;

Reply via email to