http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
new file mode 100644
index 0000000..a87ddd8
--- /dev/null
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class TableSchemaUpdateChecker {
+    private final MetadataManager metadataManager;
+    private final CubeManager cubeManager;
+
+    static class CheckResult {
+        private final boolean valid;
+        private final String reason;
+
+        private CheckResult(boolean valid, String reason) {
+            this.valid = valid;
+            this.reason = reason;
+        }
+
+        void raiseExceptionWhenInvalid() {
+            if (!valid) {
+                throw new RuntimeException(reason);
+            }
+        }
+
+        static CheckResult validOnFirstLoad(String tableName) {
+            return new CheckResult(true, format("Table '%s' hasn't been loaded 
before", tableName));
+        }
+
+        static CheckResult validOnCompatibleSchema(String tableName) {
+            return new CheckResult(true, format("Table '%s' is compatible with 
all existing cubes", tableName));
+        }
+
+        static CheckResult invalidOnFetchSchema(String tableName, Exception e) 
{
+            return new CheckResult(false, format("Failed to fetch metadata of 
'%s': %s", tableName, e.getMessage()));
+        }
+
+        static CheckResult invalidOnIncompatibleSchema(String tableName, 
List<String> reasons) {
+            StringBuilder buf = new StringBuilder();
+            for (String reason : reasons) {
+                buf.append("- ").append(reason).append("\n");
+            }
+
+            return new CheckResult(false, format("Found %d issue(s) with 
'%s':%n%s Please disable and purge related cube(s) first", reasons.size(), 
tableName, buf.toString()));
+        }
+    }
+
+    TableSchemaUpdateChecker(MetadataManager metadataManager, CubeManager 
cubeManager) {
+        this.metadataManager = checkNotNull(metadataManager, "metadataManager 
is null");
+        this.cubeManager = checkNotNull(cubeManager, "cubeManager is null");
+    }
+
+    private List<CubeInstance> findCubeByTable(final String fullTableName) {
+        Iterable<CubeInstance> relatedCubes = 
Iterables.filter(cubeManager.listAllCubes(), new Predicate<CubeInstance>() {
+            @Override
+            public boolean apply(@Nullable CubeInstance cube) {
+                if (cube == null || cube.allowBrokenDescriptor()) {
+                    return false;
+                }
+                DataModelDesc model = cube.getModel();
+                if (model == null)
+                    return false;
+                return model.containsTable(fullTableName);
+            }
+        });
+
+        return ImmutableList.copyOf(relatedCubes);
+    }
+
+    private boolean isColumnCompatible(ColumnDesc column, ColumnDesc newCol) {
+        if (!column.getName().equalsIgnoreCase(newCol.getName())) {
+            return false;
+        }
+
+        if (column.getType().isIntegerFamily()) {
+            // OLAPTable.listSourceColumns converts some integer columns to 
bigint,
+            // therefore strict type comparison won't work.
+            // changing from one integer type to another should be fine.
+            return newCol.getType().isIntegerFamily();
+        } else if (column.getType().isNumberFamily()) {
+            // Both are float/double should be fine.
+            return newCol.getType().isNumberFamily();
+        } else {
+            // only compare base type name, changing precision or scale should 
be fine
+            return column.getTypeName().equals(newCol.getTypeName());
+        }
+    }
+
+    /**
+     * check whether all columns used in `cube` has compatible schema in 
current hive schema denoted by `fieldsMap`.
+     * @param cube cube to check, must use `table` in its model
+     * @param origTable kylin's table metadata
+     * @param fieldsMap current hive schema of `table`
+     * @return true if all columns used in `cube` has compatible schema with 
`fieldsMap`, false otherwise
+     */
+    private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc 
origTable, TableDesc newTable) {
+        Set<ColumnDesc> usedColumns = Sets.newHashSet();
+        for (TblColRef col : cube.getAllColumns()) {
+            usedColumns.add(col.getColumnDesc());
+        }
+
+        List<String> violateColumns = Lists.newArrayList();
+        for (ColumnDesc column : origTable.getColumns()) {
+            if (!column.isComputedColumnn() && usedColumns.contains(column)) {
+                ColumnDesc newCol = 
newTable.findColumnByName(column.getName());
+                if (newCol == null || !isColumnCompatible(column, newCol)) {
+                    violateColumns.add(column.getName());
+                }
+            }
+        }
+        return violateColumns;
+    }
+
+    /**
+     * check whether all columns in `table` are still in `fields` and have the 
same index as before.
+     *
+     * @param table kylin's table metadata
+     * @param fields current table metadata in hive
+     * @return true if only new columns are appended in hive, false otherwise
+     */
+    private boolean checkAllColumnsInTableDesc(TableDesc origTable, TableDesc 
newTable) {
+        if (origTable.getColumnCount() > newTable.getColumnCount()) {
+            return false;
+        }
+
+        ColumnDesc[] columns = origTable.getColumns();
+        for (int i = 0; i < columns.length; i++) {
+            if (!isColumnCompatible(columns[i], newTable.getColumns()[i])) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public CheckResult allowReload(TableDesc newTableDesc) {
+        final String fullTableName = newTableDesc.getIdentity();
+
+        TableDesc existing = metadataManager.getTableDesc(fullTableName);
+        if (existing == null) {
+            return CheckResult.validOnFirstLoad(fullTableName);
+        }
+
+        List<String> issues = Lists.newArrayList();
+        for (CubeInstance cube : findCubeByTable(fullTableName)) {
+            String modelName = cube.getModel().getName();
+
+            // if user reloads a fact table used by cube, then all used 
columns must match current schema
+            if (cube.getModel().isFactTable(fullTableName)) {
+                TableDesc factTable = 
cube.getModel().findFirstTable(fullTableName).getTableDesc();
+                List<String> violateColumns = checkAllColumnsInCube(cube, 
factTable, newTableDesc);
+                if (!violateColumns.isEmpty()) {
+                    issues.add(format("Column %s used in cube[%s] and 
model[%s], but changed in hive", violateColumns, cube.getName(), modelName));
+                }
+            }
+
+            // if user reloads a lookup table used by cube, only append 
column(s) are allowed, all existing columns
+            // must be the same (except compatible type changes)
+            if (cube.getModel().isLookupTable(fullTableName)) {
+                TableDesc lookupTable = 
cube.getModel().findFirstTable(fullTableName).getTableDesc();
+                if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) {
+                    issues.add(format("Table '%s' is used as Lookup Table in 
cube[%s] and model[%s], but changed in hive", lookupTable.getIdentity(), 
cube.getName(), modelName));
+                }
+            }
+        }
+
+        if (issues.isEmpty()) {
+            return CheckResult.validOnCompatibleSchema(fullTableName);
+        }
+        return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 9f9b541..919dad4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -25,12 +25,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -44,11 +45,8 @@ import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.response.TableDescResponse;
-import org.apache.kylin.source.hive.HiveClientFactory;
-import org.apache.kylin.source.hive.HiveSourceTableLoader;
-import org.apache.kylin.source.hive.IHiveClient;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +55,11 @@ import 
org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+
 @Component("tableService")
 public class TableService extends BasicService {
 
@@ -90,17 +93,76 @@ public class TableService extends BasicService {
     }
 
     public TableDesc getTableDescByName(String tableName, boolean withExt) {
-        TableDesc table =  getMetadataManager().getTableDesc(tableName);
-        if(withExt){
+        TableDesc table = getMetadataManager().getTableDesc(tableName);
+        if (withExt) {
             table = cloneTableDesc(table);
         }
         return table;
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
-    public String[] loadHiveTablesToProject(String[] tables, String project) 
throws IOException {
-        Set<String> loaded = HiveSourceTableLoader.loadHiveTables(tables, 
getConfig());
-        String[] result = (String[]) loaded.toArray(new String[loaded.size()]);
+    public String[] loadHiveTablesToProject(String[] tables, String project) 
throws Exception {
+        // de-dup
+        SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
+        for (String fullTableName : tables) {
+            String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
+            db2tables.put(parts[0].toUpperCase(), parts[1].toUpperCase());
+        }
+
+        // load all tables first
+        List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList();
+        ISourceMetadataExplorer explr = 
SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        for (Map.Entry<String, String> entry : db2tables.entries()) {
+            Pair<TableDesc, TableExtDesc> pair = 
explr.loadTableMetadata(entry.getKey(), entry.getValue());
+            TableDesc tableDesc = pair.getFirst();
+            
Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey()));
+            
Preconditions.checkState(tableDesc.getName().equals(entry.getValue()));
+            
Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey() + "." + 
entry.getValue()));
+            TableExtDesc extDesc = pair.getSecond();
+            
Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getName()));
+            allMeta.add(pair);
+        }
+
+        // do schema check
+        MetadataManager metaMgr = MetadataManager.getInstance(getConfig());
+        CubeManager cubeMgr = CubeManager.getInstance(getConfig());
+        TableSchemaUpdateChecker checker = new 
TableSchemaUpdateChecker(metaMgr, cubeMgr);
+        for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
+            TableDesc tableDesc = pair.getFirst();
+            TableSchemaUpdateChecker.CheckResult result = 
checker.allowReload(tableDesc);
+            result.raiseExceptionWhenInvalid();
+        }
+
+        // save table meta
+        List<String> saved = Lists.newArrayList();
+        for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
+            TableDesc tableDesc = pair.getFirst();
+            TableExtDesc extDesc = pair.getSecond();
+            
+            TableDesc origTable = 
metaMgr.getTableDesc(tableDesc.getIdentity());
+            if (origTable == null) {
+                tableDesc.setUuid(UUID.randomUUID().toString());
+                tableDesc.setLastModified(0);
+            } else {
+                tableDesc.setUuid(origTable.getUuid());
+                tableDesc.setLastModified(origTable.getLastModified());
+            }
+            
+            TableExtDesc origExt = 
metaMgr.getTableExt(tableDesc.getIdentity());
+            if (origExt == null) {
+                extDesc.setUuid(UUID.randomUUID().toString());
+                extDesc.setLastModified(0);
+            } else {
+                extDesc.setUuid(origExt.getUuid());
+                extDesc.setLastModified(origExt.getLastModified());
+            }
+
+            metaMgr.saveTableExt(extDesc);
+            metaMgr.saveSourceTable(tableDesc);
+            saved.add(tableDesc.getIdentity());
+        }
+
+        String[] result = (String[]) saved.toArray(new String[saved.size()]);
         syncTableToProject(result, project);
         return result;
     }
@@ -197,9 +259,8 @@ public class TableService extends BasicService {
      * @throws Exception
      */
     public List<String> getHiveDbNames() throws Exception {
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-        List<String> results = hiveClient.getHiveDbNames();
-        return results;
+        ISourceMetadataExplorer explr = 
SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        return explr.listDatabases();
     }
 
     /**
@@ -209,9 +270,8 @@ public class TableService extends BasicService {
      * @throws Exception
      */
     public List<String> getHiveTableNames(String database) throws Exception {
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-        List<String> results = hiveClient.getHiveTableNames(database);
-        return results;
+        ISourceMetadataExplorer explr = 
SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        return explr.listTables(database);
     }
 
     private TableDescResponse cloneTableDesc(TableDesc table) {
@@ -241,7 +301,6 @@ public class TableService extends BasicService {
         return rtableDesc;
     }
 
-
     private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws 
IOException {
         List<TableDesc> descs = new ArrayList<TableDesc>();
         Iterator<TableDesc> it = tables.iterator();
@@ -255,7 +314,7 @@ public class TableService extends BasicService {
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + 
Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinalityIfNotPresent(String[] tables, String 
submitter) throws IOException {
+    public void calculateCardinalityIfNotPresent(String[] tables, String 
submitter) throws Exception {
         MetadataManager metaMgr = getMetadataManager();
         ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
         for (String table : tables) {
@@ -274,7 +333,7 @@ public class TableService extends BasicService {
      * @param tableName
      */
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + 
Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinality(String tableName, String submitter) 
throws IOException {
+    public void calculateCardinality(String tableName, String submitter) 
throws Exception {
         tableName = normalizeHiveTableName(tableName);
         TableDesc table = getMetadataManager().getTableDesc(tableName);
         final TableExtDesc tableExt = 
getMetadataManager().getTableExt(tableName);
@@ -295,7 +354,7 @@ public class TableService extends BasicService {
 
         MapReduceExecutable step1 = new MapReduceExecutable();
 
-        step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
+        
step1.setMapReduceJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob.class);
         step1.setMapReduceParams(param);
         step1.setParam("segmentId", tableName);
 
@@ -303,7 +362,7 @@ public class TableService extends BasicService {
 
         HadoopShellExecutable step2 = new HadoopShellExecutable();
 
-        step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
+        
step2.setJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob.class);
         step2.setJobParams(param);
         step2.setParam("segmentId", tableName);
         job.addTask(step2);
@@ -313,7 +372,7 @@ public class TableService extends BasicService {
         getExecutableManager().addJob(job);
     }
 
-    public String normalizeHiveTableName(String tableName){
+    public String normalizeHiveTableName(String tableName) {
         String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
         return (dbTableName[0] + "." + dbTableName[1]).toUpperCase();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java
index 5c3eeb3..33285bd 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java
@@ -38,8 +38,6 @@ import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +74,7 @@ public class TableServiceV2 extends TableService {
     private KafkaConfigService kafkaConfigService;
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + 
Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinalityIfNotPresent(String[] tables, String 
submitter) throws IOException {
+    public void calculateCardinalityIfNotPresent(String[] tables, String 
submitter) throws Exception {
         MetadataManager metaMgr = getMetadataManager();
         ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
         for (String table : tables) {
@@ -95,7 +93,7 @@ public class TableServiceV2 extends TableService {
      * @param tableName
      */
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + 
Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinality(String tableName, String submitter) 
throws IOException {
+    public void calculateCardinality(String tableName, String submitter) 
throws Exception {
         Message msg = MsgPicker.getMsg();
 
         tableName = normalizeHiveTableName(tableName);
@@ -118,7 +116,7 @@ public class TableServiceV2 extends TableService {
 
         MapReduceExecutable step1 = new MapReduceExecutable();
 
-        step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
+        
step1.setMapReduceJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob.class);
         step1.setMapReduceParams(param);
         step1.setParam("segmentId", tableName);
 
@@ -126,7 +124,7 @@ public class TableServiceV2 extends TableService {
 
         HadoopShellExecutable step2 = new HadoopShellExecutable();
 
-        step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
+        
step2.setJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob.class);
         step2.setJobParams(param);
         step2.setParam("segmentId", tableName);
         job.addTask(step2);
@@ -187,7 +185,7 @@ public class TableServiceV2 extends TableService {
         return rtn;
     }
 
-    public Map<String, String[]> loadHiveTables(String[] tableNames, String 
project, boolean isNeedProfile) throws IOException {
+    public Map<String, String[]> loadHiveTables(String[] tableNames, String 
project, boolean isNeedProfile) throws Exception {
         String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
         Map<String, String[]> result = new HashMap<String, String[]>();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java
new file mode 100644
index 0000000..0f7152b
--- /dev/null
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+
+public class HiveMetadataExplorer implements ISourceMetadataExplorer {
+
+    @Override
+    public List<String> listDatabases() throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        return hiveClient.getHiveDbNames();
+    }
+
+    @Override
+    public List<String> listTables(String database) throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        return hiveClient.getHiveTableNames(database);
+    }
+
+    @Override
+    public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, 
String tableName) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        MetadataManager metaMgr = MetadataManager.getInstance(config);
+
+        HiveTableMeta hiveTableMeta;
+        try {
+            hiveTableMeta = hiveClient.getHiveTableMeta(database, tableName);
+        } catch (Exception e) {
+            throw new RuntimeException("cannot get HiveTableMeta", e);
+        }
+
+        TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
+        if (tableDesc == null) {
+            tableDesc = new TableDesc();
+            tableDesc.setDatabase(database.toUpperCase());
+            tableDesc.setName(tableName.toUpperCase());
+            tableDesc.setUuid(UUID.randomUUID().toString());
+            tableDesc.setLastModified(0);
+        }
+        if (hiveTableMeta.tableType != null) {
+            tableDesc.setTableType(hiveTableMeta.tableType);
+        }
+
+        int columnNumber = hiveTableMeta.allColumns.size();
+        List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
+        for (int i = 0; i < columnNumber; i++) {
+            HiveTableMeta.HiveTableColumnMeta field = 
hiveTableMeta.allColumns.get(i);
+            ColumnDesc cdesc = new ColumnDesc();
+            cdesc.setName(field.name.toUpperCase());
+            // use "double" in kylin for "float"
+            if ("float".equalsIgnoreCase(field.dataType)) {
+                cdesc.setDatatype("double");
+            } else {
+                cdesc.setDatatype(field.dataType);
+            }
+            cdesc.setId(String.valueOf(i + 1));
+            cdesc.setComment(field.comment);
+            columns.add(cdesc);
+        }
+        tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
+
+        StringBuffer partitionColumnString = new StringBuffer();
+        for (int i = 0, n = hiveTableMeta.partitionColumns.size(); i < n; i++) 
{
+            if (i > 0)
+                partitionColumnString.append(", ");
+            
partitionColumnString.append(hiveTableMeta.partitionColumns.get(i).name.toUpperCase());
+        }
+
+        TableExtDesc tableExtDesc = 
metaMgr.getTableExt(tableDesc.getIdentity());
+        tableExtDesc.addDataSourceProp("location", hiveTableMeta.sdLocation);
+        tableExtDesc.addDataSourceProp("owner", hiveTableMeta.owner);
+        tableExtDesc.addDataSourceProp("last_access_time", 
String.valueOf(hiveTableMeta.lastAccessTime));
+        tableExtDesc.addDataSourceProp("partition_column", 
partitionColumnString.toString());
+        tableExtDesc.addDataSourceProp("total_file_size", 
String.valueOf(hiveTableMeta.fileSize));
+        tableExtDesc.addDataSourceProp("total_file_number", 
String.valueOf(hiveTableMeta.fileNum));
+        tableExtDesc.addDataSourceProp("hive_inputFormat", 
hiveTableMeta.sdInputFormat);
+        tableExtDesc.addDataSourceProp("hive_outputFormat", 
hiveTableMeta.sdOutputFormat);
+        tableExtDesc.addDataSourceProp("skip_header_line_count", 
String.valueOf(hiveTableMeta.skipHeaderLineCount));
+
+        return Pair.newPair(tableDesc, tableExtDesc);
+    }
+
+    @Override
+    public List<String> getRelatedKylinResources(TableDesc table) {
+        return Collections.emptyList();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index af0a519..77c8582 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,22 +18,22 @@
 
 package org.apache.kylin.source.hive;
 
-import java.util.List;
-
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.ReadableTable;
-
-import com.google.common.collect.Lists;
+import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 
 //used by reflection
 public class HiveSource implements ISource {
 
+    @Override
+    public ISourceMetadataExplorer getSourceMetadataExplorer() {
+        return new HiveMetadataExplorer();
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
@@ -45,33 +45,13 @@ public class HiveSource implements ISource {
     }
 
     @Override
-    public ReadableTable createReadableTable(TableDesc tableDesc) {
+    public IReadableTable createReadableTable(TableDesc tableDesc) {
         return new HiveTable(tableDesc);
     }
 
     @Override
-    public List<String> getMRDependentResources(TableDesc table) {
-        return Lists.newArrayList();
-    }
-
-    @Override
-    public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, 
SourcePartition srcPartition) {
+    public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable 
buildable, SourcePartition srcPartition) {
         SourcePartition result = SourcePartition.getCopyOf(srcPartition);
-        CubeInstance cube = (CubeInstance) buildable;
-        if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() 
== true) {
-            // normal partitioned cube
-            if (result.getStartDate() == 0) {
-                final CubeSegment last = cube.getLastSegment();
-                if (last != null) {
-                    result.setStartDate(last.getDateRangeEnd());
-                }
-            }
-        } else {
-            // full build
-            result.setStartDate(0);
-            result.setEndDate(Long.MAX_VALUE);
-        }
-
         result.setStartOffset(0);
         result.setEndOffset(0);
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
deleted file mode 100644
index 87edfe4..0000000
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-
-/**
- * Management class to sync hive table metadata with command See main method 
for
- * how to use the class
- *
- * @author jianliu
- */
-public class HiveSourceTableLoader {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = 
LoggerFactory.getLogger(HiveSourceTableLoader.class);
-
-    public static Set<String> loadHiveTables(String[] hiveTables, KylinConfig 
config) throws IOException {
-
-        SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
-        for (String fullTableName : hiveTables) {
-            String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
-            db2tables.put(parts[0], parts[1]);
-        }
-
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-        SchemaChecker checker = new SchemaChecker(hiveClient, 
MetadataManager.getInstance(config), CubeManager.getInstance(config));
-        for (Map.Entry<String, String> entry : db2tables.entries()) {
-            SchemaChecker.CheckResult result = 
checker.allowReload(entry.getKey(), entry.getValue());
-            result.raiseExceptionWhenInvalid();
-        }
-
-        // extract from hive
-        Set<String> loadedTables = Sets.newHashSet();
-        for (String database : db2tables.keySet()) {
-            List<String> loaded = extractHiveTables(database, 
db2tables.get(database), hiveClient);
-            loadedTables.addAll(loaded);
-        }
-
-        return loadedTables;
-    }
-
-    private static List<String> extractHiveTables(String database, Set<String> 
tables, IHiveClient hiveClient) throws IOException {
-
-        List<String> loadedTables = Lists.newArrayList();
-        MetadataManager metaMgr = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        for (String tableName : tables) {
-            HiveTableMeta hiveTableMeta;
-            try {
-                hiveTableMeta = hiveClient.getHiveTableMeta(database, 
tableName);
-            } catch (Exception e) {
-                throw new RuntimeException("cannot get HiveTableMeta", e);
-            }
-
-            TableDesc tableDesc = metaMgr.getTableDesc(database + "." + 
tableName);
-            if (tableDesc == null) {
-                tableDesc = new TableDesc();
-                tableDesc.setDatabase(database.toUpperCase());
-                tableDesc.setName(tableName.toUpperCase());
-                tableDesc.setUuid(UUID.randomUUID().toString());
-                tableDesc.setLastModified(0);
-            }
-            if (hiveTableMeta.tableType != null) {
-                tableDesc.setTableType(hiveTableMeta.tableType);
-            }
-
-            int columnNumber = hiveTableMeta.allColumns.size();
-            List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
-            for (int i = 0; i < columnNumber; i++) {
-                HiveTableMeta.HiveTableColumnMeta field = 
hiveTableMeta.allColumns.get(i);
-                ColumnDesc cdesc = new ColumnDesc();
-                cdesc.setName(field.name.toUpperCase());
-                // use "double" in kylin for "float"
-                if ("float".equalsIgnoreCase(field.dataType)) {
-                    cdesc.setDatatype("double");
-                } else {
-                    cdesc.setDatatype(field.dataType);
-                }
-                cdesc.setId(String.valueOf(i + 1));
-                cdesc.setComment(field.comment);
-                columns.add(cdesc);
-            }
-            tableDesc.setColumns(columns.toArray(new 
ColumnDesc[columnNumber]));
-
-            StringBuffer partitionColumnString = new StringBuffer();
-            for (int i = 0, n = hiveTableMeta.partitionColumns.size(); i < n; 
i++) {
-                if (i > 0)
-                    partitionColumnString.append(", ");
-                
partitionColumnString.append(hiveTableMeta.partitionColumns.get(i).name.toUpperCase());
-            }
-
-            TableExtDesc tableExtDesc = 
metaMgr.getTableExt(tableDesc.getIdentity());
-            tableExtDesc.addDataSourceProp("location", 
hiveTableMeta.sdLocation);
-            tableExtDesc.addDataSourceProp("owner", hiveTableMeta.owner);
-            tableExtDesc.addDataSourceProp("last_access_time", 
String.valueOf(hiveTableMeta.lastAccessTime));
-            tableExtDesc.addDataSourceProp("partition_column", 
partitionColumnString.toString());
-            tableExtDesc.addDataSourceProp("total_file_size", 
String.valueOf(hiveTableMeta.fileSize));
-            tableExtDesc.addDataSourceProp("total_file_number", 
String.valueOf(hiveTableMeta.fileNum));
-            tableExtDesc.addDataSourceProp("hive_inputFormat", 
hiveTableMeta.sdInputFormat);
-            tableExtDesc.addDataSourceProp("hive_outputFormat", 
hiveTableMeta.sdOutputFormat);
-            tableExtDesc.addDataSourceProp("skip_header_line_count", 
String.valueOf(hiveTableMeta.skipHeaderLineCount));
-
-            metaMgr.saveTableExt(tableExtDesc);
-            metaMgr.saveSourceTable(tableDesc);
-
-            loadedTables.add(tableDesc.getIdentity());
-        }
-        return loadedTables;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
index 83e49e9..14ed1f9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -25,13 +25,13 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.DFSFileTable;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  */
-public class HiveTable implements ReadableTable {
+public class HiveTable implements IReadableTable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(HiveTable.class);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 8309a8c..75f322f 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -33,7 +33,7 @@ import 
org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
 import org.apache.hive.hcatalog.data.transfer.HCatReader;
 import org.apache.hive.hcatalog.data.transfer.ReadEntity;
 import org.apache.hive.hcatalog.data.transfer.ReaderContext;
-import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.IReadableTable.TableReader;
 
 /**
  * An implementation of TableReader with HCatalog for Hive table.

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
deleted file mode 100644
index dbcfc7a..0000000
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.String.format;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class SchemaChecker {
-    private final IHiveClient hiveClient;
-    private final MetadataManager metadataManager;
-    private final CubeManager cubeManager;
-
-    static class CheckResult {
-        private final boolean valid;
-        private final String reason;
-
-        private CheckResult(boolean valid, String reason) {
-            this.valid = valid;
-            this.reason = reason;
-        }
-
-        void raiseExceptionWhenInvalid() {
-            if (!valid) {
-                throw new RuntimeException(reason);
-            }
-        }
-
-        static CheckResult validOnFirstLoad(String tableName) {
-            return new CheckResult(true, format("Table '%s' hasn't been loaded 
before", tableName));
-        }
-
-        static CheckResult validOnCompatibleSchema(String tableName) {
-            return new CheckResult(true, format("Table '%s' is compatible with 
all existing cubes", tableName));
-        }
-
-        static CheckResult invalidOnFetchSchema(String tableName, Exception e) 
{
-            return new CheckResult(false, format("Failed to fetch metadata of 
'%s': %s", tableName, e.getMessage()));
-        }
-
-        static CheckResult invalidOnIncompatibleSchema(String tableName, 
List<String> reasons) {
-            StringBuilder buf = new StringBuilder();
-            for (String reason : reasons) {
-                buf.append("- ").append(reason).append("\n");
-            }
-
-            return new CheckResult(false, format("Found %d issue(s) with 
'%s':%n%s Please disable and purge related cube(s) first", reasons.size(), 
tableName, buf.toString()));
-        }
-    }
-
-    SchemaChecker(IHiveClient hiveClient, MetadataManager metadataManager, 
CubeManager cubeManager) {
-        this.hiveClient = checkNotNull(hiveClient, "hiveClient is null");
-        this.metadataManager = checkNotNull(metadataManager, "metadataManager 
is null");
-        this.cubeManager = checkNotNull(cubeManager, "cubeManager is null");
-    }
-
-    private List<HiveTableMeta.HiveTableColumnMeta> fetchSchema(String dbName, 
String tblName) throws Exception {
-        List<HiveTableMeta.HiveTableColumnMeta> columnMetas = 
Lists.newArrayList();
-        columnMetas.addAll(hiveClient.getHiveTableMeta(dbName, 
tblName).allColumns);
-        return columnMetas;
-    }
-
-    private List<CubeInstance> findCubeByTable(final String fullTableName) {
-        Iterable<CubeInstance> relatedCubes = 
Iterables.filter(cubeManager.listAllCubes(), new Predicate<CubeInstance>() {
-            @Override
-            public boolean apply(@Nullable CubeInstance cube) {
-                if (cube == null || cube.allowBrokenDescriptor()) {
-                    return false;
-                }
-                DataModelDesc model = cube.getModel();
-                if (model == null)
-                    return false;
-                return model.containsTable(fullTableName);
-            }
-        });
-
-        return ImmutableList.copyOf(relatedCubes);
-    }
-
-    private boolean isColumnCompatible(ColumnDesc column, 
HiveTableMeta.HiveTableColumnMeta field) {
-        if (!column.getName().equalsIgnoreCase(field.name)) {
-            return false;
-        }
-
-        String typeStr = field.dataType;
-        // kylin uses double internally for float, see 
HiveSourceTableLoader.java
-        // TODO should this normalization to be in DataType class ?
-        if ("float".equalsIgnoreCase(typeStr)) {
-            typeStr = "double";
-        }
-        DataType fieldType = DataType.getType(typeStr);
-
-        if (column.getType().isIntegerFamily()) {
-            // OLAPTable.listSourceColumns converts some integer columns to 
bigint,
-            // therefore strict type comparison won't work.
-            // changing from one integer type to another should be fine.
-            return fieldType.isIntegerFamily();
-        } else {
-            // only compare base type name, changing precision or scale should 
be fine
-            return column.getTypeName().equals(fieldType.getName());
-        }
-    }
-
-    /**
-     * check whether all columns used in `cube` has compatible schema in 
current hive schema denoted by `fieldsMap`.
-     * @param cube cube to check, must use `table` in its model
-     * @param table kylin's table metadata
-     * @param fieldsMap current hive schema of `table`
-     * @return true if all columns used in `cube` has compatible schema with 
`fieldsMap`, false otherwise
-     */
-    private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc 
table, Map<String, HiveTableMeta.HiveTableColumnMeta> fieldsMap) {
-        Set<ColumnDesc> usedColumns = Sets.newHashSet();
-        for (TblColRef col : cube.getAllColumns()) {
-            usedColumns.add(col.getColumnDesc());
-        }
-
-        List<String> violateColumns = Lists.newArrayList();
-        for (ColumnDesc column : table.getColumns()) {
-            if (!column.isComputedColumnn() && usedColumns.contains(column)) {
-                HiveTableMeta.HiveTableColumnMeta field = 
fieldsMap.get(column.getName());
-                if (field == null || !isColumnCompatible(column, field)) {
-                    violateColumns.add(column.getName());
-                }
-            }
-        }
-        return violateColumns;
-    }
-
-    /**
-     * check whether all columns in `table` are still in `fields` and have the 
same index as before.
-     *
-     * @param table kylin's table metadata
-     * @param fields current table metadata in hive
-     * @return true if only new columns are appended in hive, false otherwise
-     */
-    private boolean checkAllColumnsInTableDesc(TableDesc table, 
List<HiveTableMeta.HiveTableColumnMeta> fields) {
-        if (table.getColumnCount() > fields.size()) {
-            return false;
-        }
-
-        ColumnDesc[] columns = table.getColumns();
-        for (int i = 0; i < columns.length; i++) {
-            if (!isColumnCompatible(columns[i], fields.get(i))) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public CheckResult allowReload(String dbName, String tblName) {
-        final String fullTableName = (dbName + "." + tblName).toUpperCase();
-
-        TableDesc existing = metadataManager.getTableDesc(fullTableName);
-        if (existing == null) {
-            return CheckResult.validOnFirstLoad(fullTableName);
-        }
-
-        List<HiveTableMeta.HiveTableColumnMeta> currentFields;
-        Map<String, HiveTableMeta.HiveTableColumnMeta> currentFieldsMap = 
Maps.newHashMap();
-        try {
-            currentFields = fetchSchema(dbName, tblName);
-        } catch (Exception e) {
-            return CheckResult.invalidOnFetchSchema(fullTableName, e);
-        }
-        for (HiveTableMeta.HiveTableColumnMeta field : currentFields) {
-            currentFieldsMap.put(field.name.toUpperCase(), field);
-        }
-
-        List<String> issues = Lists.newArrayList();
-        for (CubeInstance cube : findCubeByTable(fullTableName)) {
-            String modelName = cube.getModel().getName();
-
-            // if user reloads a fact table used by cube, then all used 
columns must match current schema
-            if (cube.getModel().isFactTable(fullTableName)) {
-                TableDesc factTable = 
cube.getModel().findFirstTable(fullTableName).getTableDesc();
-                List<String> violateColumns = checkAllColumnsInCube(cube, 
factTable, currentFieldsMap);
-                if (!violateColumns.isEmpty()) {
-                    issues.add(format("Column %s used in cube[%s] and 
model[%s], but changed in hive", violateColumns, cube.getName(), modelName));
-                }
-            }
-
-            // if user reloads a lookup table used by cube, only append 
column(s) are allowed, all existing columns
-            // must be the same (except compatible type changes)
-            if (cube.getModel().isLookupTable(fullTableName)) {
-                TableDesc lookupTable = 
cube.getModel().findFirstTable(fullTableName).getTableDesc();
-                if (!checkAllColumnsInTableDesc(lookupTable, currentFields)) {
-                    issues.add(format("Table '%s' is used as Lookup Table in 
cube[%s] and model[%s], but changed in hive", lookupTable.getIdentity(), 
cube.getName(), modelName));
-                }
-            }
-        }
-
-        if (issues.isEmpty()) {
-            return CheckResult.validOnCompatibleSchema(fullTableName);
-        }
-        return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 6689c6e..52d2e6f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -24,14 +24,17 @@ import java.util.Map;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.util.KafkaClient;
@@ -56,20 +59,12 @@ public class KafkaSource implements ISource {
     }
 
     @Override
-    public ReadableTable createReadableTable(TableDesc tableDesc) {
+    public IReadableTable createReadableTable(TableDesc tableDesc) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public List<String> getMRDependentResources(TableDesc table) {
-        List<String> dependentResources = Lists.newArrayList();
-        
dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
-        
dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
-        return dependentResources;
-    }
-
-    @Override
-    public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, 
SourcePartition srcPartition) {
+    public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable 
buildable, SourcePartition srcPartition) {
         checkSourceOffsets(srcPartition);
         final SourcePartition result = SourcePartition.getCopyOf(srcPartition);
         final CubeInstance cube = (CubeInstance) buildable;
@@ -185,4 +180,33 @@ public class KafkaSource implements ISource {
         }
     }
 
+    @Override
+    public ISourceMetadataExplorer getSourceMetadataExplorer() {
+        return new ISourceMetadataExplorer() {
+
+            @Override
+            public List<String> listDatabases() throws Exception {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public List<String> listTables(String database) throws Exception {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public Pair<TableDesc, TableExtDesc> loadTableMetadata(String 
database, String table) throws Exception {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public List<String> getRelatedKylinResources(TableDesc table) {
+                List<String> dependentResources = Lists.newArrayList();
+                
dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
+                
dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                return dependentResources;
+            }
+        };
+    }
+
 }

Reply via email to