HIVE-18839: Implement incremental rebuild for materialized views (only insert 
operations in source tables) (Jesus Camacho Rodriguez, reviewed by Ashutosh 
Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c695c70b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c695c70b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c695c70b

Branch: refs/heads/branch-3
Commit: c695c70b10f4d8b5014e5d3de7b9b99fbed30e41
Parents: d1a9358
Author: Jesus Camacho Rodriguez <jcama...@apache.org>
Authored: Tue Mar 13 00:17:48 2018 -0700
Committer: Jesus Camacho Rodriguez <jcama...@apache.org>
Committed: Tue Apr 10 11:30:05 2018 +0200

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   10 +
 .../test/resources/testconfiguration.properties |   30 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |    3 +-
 .../hive/ql/exec/MaterializedViewTask.java      |    4 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   99 +-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   11 +
 .../hive/ql/lockmgr/HiveTxnManagerImpl.java     |    9 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  221 +-
 .../metadata/HiveMaterializedViewsRegistry.java |    2 +
 .../HiveAggregateIncrementalRewritingRule.java  |  175 ++
 .../views/HiveAugmentMaterializationRule.java   |  123 +
 ...HiveNoAggregateIncrementalRewritingRule.java |   53 +
 .../MaterializedViewRewritingRelVisitor.java    |  164 +
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |  315 +-
 ...MaterializedViewRebuildSemanticAnalyzer.java |   54 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   13 +-
 .../ql/udf/generic/GenericUDTFGetSplits.java    |    4 +-
 .../materialized_view_create_rewrite_4.q        |   86 +-
 .../materialized_view_create_rewrite_5.q        |  123 +
 .../materialized_view_create_rewrite_3.q.out    |   77 +-
 .../materialized_view_create_rewrite_4.q.out    | 1784 +++++++++++
 .../materialized_view_create_rewrite_5.q.out    | 1106 +++++++
 ...ized_view_create_rewrite_rebuild_dummy.q.out |   77 +-
 ...alized_view_create_rewrite_time_window.q.out |  859 ++++++
 .../llap/materialized_view_rewrite_1.q.out      | 1152 +++++++
 .../llap/materialized_view_rewrite_2.q.out      |  756 +++++
 .../llap/materialized_view_rewrite_3.q.out      |  545 ++++
 .../llap/materialized_view_rewrite_4.q.out      | 1071 +++++++
 .../llap/materialized_view_rewrite_5.q.out      | 1558 ++++++++++
 .../llap/materialized_view_rewrite_6.q.out      |  896 ++++++
 .../llap/materialized_view_rewrite_7.q.out      | 1022 +++++++
 .../llap/materialized_view_rewrite_8.q.out      |  641 ++++
 .../llap/materialized_view_rewrite_9.q.out      |  389 +++
 .../materialized_view_create_rewrite.q.out      |  477 ---
 .../materialized_view_create_rewrite_2.q.out    |  598 ----
 .../materialized_view_create_rewrite_3.q.out    | 1100 -------
 .../materialized_view_create_rewrite_4.q.out    |  886 ------
 ...erialized_view_create_rewrite_multi_db.q.out |  175 --
 ...alized_view_create_rewrite_time_window.q.out |  830 -----
 .../materialized_view_rewrite_1.q.out           | 1104 -------
 .../materialized_view_rewrite_2.q.out           |  796 -----
 .../materialized_view_rewrite_3.q.out           |  576 ----
 .../materialized_view_rewrite_4.q.out           |  944 ------
 .../materialized_view_rewrite_5.q.out           | 1522 ----------
 .../materialized_view_rewrite_6.q.out           |  830 -----
 .../materialized_view_rewrite_7.q.out           | 1036 -------
 .../materialized_view_rewrite_8.q.out           |  642 ----
 .../materialized_view_rewrite_9.q.out           |  361 ---
 .../materialized_view_rewrite_ssb.q.out         | 1975 ------------
 .../materialized_view_rewrite_ssb_2.q.out       | 1981 ------------
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 1248 ++++++--
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.h    |  280 ++
 .../ThriftHiveMetastore_server.skeleton.cpp     |   10 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp |   39 +-
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   17 +-
 .../hive/metastore/api/Materialization.java     |  157 +-
 .../hive/metastore/api/ThriftHiveMetastore.java | 2825 +++++++++++++++---
 .../gen-php/metastore/ThriftHiveMetastore.php   |  517 ++++
 .../src/gen/thrift/gen-php/metastore/Types.php  |   23 +
 .../hive_metastore/ThriftHiveMetastore-remote   |   14 +
 .../hive_metastore/ThriftHiveMetastore.py       |  439 +++
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |   17 +-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |    5 +-
 .../gen/thrift/gen-rb/thrift_hive_metastore.rb  |  116 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |   12 +
 .../hive/metastore/HiveMetaStoreClient.java     |   10 +
 .../hadoop/hive/metastore/IMetaStoreClient.java |   21 +
 .../MaterializationInvalidationInfo.java        |   60 -
 .../MaterializationsInvalidationCache.java      |  171 +-
 .../MaterializationsRebuildLockCleanerTask.java |   61 +
 .../MaterializationsRebuildLockHandler.java     |  216 ++
 .../hive/metastore/conf/MetastoreConf.java      |    4 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |   64 +-
 .../hadoop/hive/metastore/txn/TxnStore.java     |    3 +-
 .../src/main/thrift/hive_metastore.thrift       |    5 +-
 .../HiveMetaStoreClientPreCatalog.java          |   12 +
 ...stMetaStoreMaterializationsCacheCleaner.java |   59 +-
 77 files changed, 18889 insertions(+), 16781 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 17b2485..e540d02 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1594,6 +1594,16 @@ public class HiveConf extends Configuration {
         "moment in time t0, the materialized view will not be considered for 
rewriting anymore after t0 plus " +
         "the value assigned to this property. Default value 0 means that the 
materialized view cannot be " +
         "outdated to be used automatically in query rewriting."),
+    
HIVE_MATERIALIZED_VIEW_REWRITING_INCREMENTAL("hive.materializedview.rewriting.incremental",
 true,
+        "Whether to try to execute incremental rewritings based on outdated 
materializations and\n" +
+        "current content of tables. Default value of true effectively amounts 
to enabling incremental\n" +
+        "rebuild for the materializations too."),
+    
HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL("hive.materializedview.rebuild.incremental",
 true,
+        "Whether to try to execute incremental rebuild for the materialized 
views. Incremental rebuild\n" +
+        "tries to modify the original materialization contents to reflect the 
latest changes to the\n" +
+        "materialized view source tables, instead of rebuilding the contents 
fully. Incremental rebuild\n" +
+        "is based on the materialized view algebraic incremental rewriting. 
Hence, this requires\n" +
+        "hive.materializedview.rewriting.incremental to be true."),
     HIVE_MATERIALIZED_VIEW_FILE_FORMAT("hive.materializedview.fileformat", 
"ORC",
         new StringSet("none", "TextFile", "SequenceFile", "RCfile", "ORC"),
         "Default file format for CREATE MATERIALIZED VIEW statement"),

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 28c14eb..48d62a8 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -206,15 +206,6 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
   load_dyn_part2.q,\
   load_dyn_part3.q,\
   lvj_mapjoin.q,\
-  materialized_view_create_rewrite.q,\
-  materialized_view_create_rewrite_3.q,\
-  materialized_view_describe.q,\
-  materialized_view_rewrite_ssb.q,\
-  materialized_view_create.q,\
-  materialized_view_create_rewrite_2.q,\
-  materialized_view_create_rewrite_multi_db.q,\
-  materialized_view_drop.q,\
-  materialized_view_rewrite_ssb_2.q,\
   mapjoin2.q,\
   mapjoin3.q,\
   mapjoin_decimal.q,\
@@ -594,8 +585,29 @@ minillaplocal.query.files=\
   load_data_acid_rename.q,\
   load_dyn_part5.q,\
   lvj_mapjoin.q,\
+  materialized_view_create.q,\
   materialized_view_create_rewrite_dummy.q,\
+  materialized_view_create_rewrite_multi_db.q,\
   materialized_view_create_rewrite_rebuild_dummy.q,\
+  materialized_view_create_rewrite_time_window.q,\
+  materialized_view_create_rewrite.q,\
+  materialized_view_create_rewrite_2.q,\
+  materialized_view_create_rewrite_3.q,\
+  materialized_view_create_rewrite_4.q,\
+  materialized_view_create_rewrite_5.q,\
+  materialized_view_describe.q,\
+  materialized_view_drop.q,\
+  materialized_view_rewrite_1.q,\
+  materialized_view_rewrite_2.q,\
+  materialized_view_rewrite_3.q,\
+  materialized_view_rewrite_4.q,\
+  materialized_view_rewrite_5.q,\
+  materialized_view_rewrite_6.q,\
+  materialized_view_rewrite_7.q,\
+  materialized_view_rewrite_8.q,\
+  materialized_view_rewrite_9.q,\
+  materialized_view_rewrite_ssb.q,\
+  materialized_view_rewrite_ssb_2.q,\
   mapjoin_decimal.q,\
   mapjoin_hint.q,\
   mapjoin_emit_interval.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index fb1efe0..bda2af3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.Constants;
@@ -5057,7 +5058,7 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
         CreationMetadata cm =
             new CreationMetadata(MetaStoreUtils.getDefaultCatalog(conf), 
tbl.getDbName(),
                 tbl.getTableName(), 
ImmutableSet.copyOf(crtView.getTablesUsed()));
-        cm.setValidTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+        
cm.setValidTxnList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
         tbl.getTTable().setCreationMetadata(cm);
       }
       db.createTable(tbl, crtView.getIfNotExists());

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java
index 50fc4e0..834df84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -69,7 +71,7 @@ public class MaterializedViewTask extends 
Task<MaterializedViewDesc> implements
             new CreationMetadata(MetaStoreUtils.getDefaultCatalog(conf), 
mvTable.getDbName(),
                 mvTable.getTableName(),
                 
ImmutableSet.copyOf(mvTable.getCreationMetadata().getTablesUsed()));
-        cm.setValidTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+        
cm.setValidTxnList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
         db.updateCreationMetadata(mvTable.getDbName(), mvTable.getTableName(), 
cm);
       }
     } catch (HiveException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 5a95649..f566842 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -61,6 +61,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * An implementation of HiveTxnManager that stores the transactions in the 
metastore database.
@@ -754,6 +755,13 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
     }
 
     Heartbeater heartbeater = new Heartbeater(this, conf, queryId, 
currentUser);
+    heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, 
heartbeater);
+    LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" 
+ heartbeatInterval +
+        " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
+    return heartbeater;
+  }
+
+  private ScheduledFuture<?> startHeartbeat(long initialDelay, long 
heartbeatInterval, Runnable heartbeater) {
     // For negative testing purpose..
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && 
conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
       initialDelay = 0;
@@ -763,12 +771,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
       resources, that they all don't start heartbeating at the same time*/
       initialDelay = (long)Math.floor(heartbeatInterval * 0.75 * 
Math.random());
     }
-
-    heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate(
+    ScheduledFuture<?> task = heartbeatExecutorService.scheduleAtFixedRate(
         heartbeater, initialDelay, heartbeatInterval, TimeUnit.MILLISECONDS);
-    LOG.info("Started heartbeat with delay/interval = " + initialDelay + "/" + 
heartbeatInterval +
-        " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
-    return heartbeater;
+    return task;
   }
 
   private void stopHeartbeat() throws LockException {
@@ -886,6 +891,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     }
     return false;
   }
+
   @Override
   public boolean isImplicitTransactionOpen() {
     if(!isTxnOpen()) {
@@ -976,6 +982,38 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
     }
   }
 
+  @Override
+  public LockResponse acquireMaterializationRebuildLock(String dbName, String 
tableName, long txnId) throws LockException {
+    // Acquire lock
+    LockResponse lockResponse;
+    try {
+      lockResponse = getMS().lockMaterializationRebuild(dbName, tableName, 
txnId);
+    } catch (TException e) {
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+    if (lockResponse.getState() == LockState.ACQUIRED) {
+      // If lock response is ACQUIRED, we can create the heartbeater
+      long initialDelay = 0L;
+      long heartbeatInterval = getHeartbeatInterval(conf);
+      assert heartbeatInterval > 0;
+      MaterializationRebuildLockHeartbeater heartbeater = new 
MaterializationRebuildLockHeartbeater(
+          this, dbName, tableName, queryId, txnId);
+      ScheduledFuture<?> task = startHeartbeat(initialDelay, 
heartbeatInterval, heartbeater);
+      heartbeater.task.set(task);
+      LOG.debug("Started heartbeat for materialization rebuild lock for {} 
with delay/interval = {}/{} {} for query: {}",
+          AcidUtils.getFullTableName(dbName, tableName), initialDelay, 
heartbeatInterval, TimeUnit.MILLISECONDS, queryId);
+    }
+    return lockResponse;
+  }
+
+  private boolean heartbeatMaterializationRebuildLock(String dbName, String 
tableName, long txnId) throws LockException {
+    try {
+      return getMS().heartbeatLockMaterializationRebuild(dbName, tableName, 
txnId);
+    } catch (TException e) {
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
+
   private static long getHeartbeatInterval(Configuration conf) throws 
LockException {
     // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS),
     // then divide it by 2 to give us a safety factor.
@@ -1042,4 +1080,55 @@ public final class DbTxnManager extends 
HiveTxnManagerImpl {
       }
     }
   }
+
+  /**
+   * MaterializationRebuildLockHeartbeater is a runnable that will be run in a
+   * ScheduledExecutorService in given intervals. Once the heartbeat cannot
+   * refresh the lock anymore, it will interrupt itself.
+   */
+  private static class MaterializationRebuildLockHeartbeater implements 
Runnable {
+
+    private final DbTxnManager txnMgr;
+    private final String dbName;
+    private final String tableName;
+    private final String queryId;
+    private final long txnId;
+    private final AtomicReference<ScheduledFuture<?>> task;
+
+    MaterializationRebuildLockHeartbeater(DbTxnManager txnMgr, String dbName, 
String tableName,
+        String queryId, long txnId) {
+      this.txnMgr = txnMgr;
+      this.queryId = queryId;
+      this.dbName = dbName;
+      this.tableName = tableName;
+      this.txnId = txnId;
+      this.task = new AtomicReference<>();
+    }
+
+    /**
+     * Send a heartbeat to the metastore for locks and transactions.
+     */
+    @Override
+    public void run() {
+      LOG.trace("Heartbeating materialization rebuild lock for {} for query: 
{}",
+          AcidUtils.getFullTableName(dbName, tableName), queryId);
+      boolean refreshed;
+      try {
+        refreshed = txnMgr.heartbeatMaterializationRebuildLock(dbName, 
tableName, txnId);
+      } catch (LockException e) {
+        LOG.error("Failed trying to acquire lock", e);
+        throw new RuntimeException(e);
+      }
+      if (!refreshed) {
+        // We could not heartbeat the lock, i.e., the operation has finished,
+        // hence we interrupt this work
+        ScheduledFuture<?> t = task.get();
+        if (t != null) {
+          t.cancel(false);
+          LOG.debug("Stopped heartbeat for materialization rebuild lock for {} 
for query: {}",
+              AcidUtils.getFullTableName(dbName, tableName), queryId);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 490f3b8..e0e235b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -269,4 +270,14 @@ public interface HiveTxnManager {
    * Even a single statement, (e.g. Merge, multi-insert may generates several 
writes).
    */
   int getStmtIdAndIncrement();
+
+  /**
+   * Acquire the materialization rebuild lock for a given view. We need to 
specify the fully
+   * qualified name of the materialized view and the open transaction ID so we 
can identify
+   * uniquely the lock.
+   * @return the response from the metastore, where the lock id is equal to 
the txn id and
+   * the status can be either ACQUIRED or NOT ACQUIRED
+   */
+  LockResponse acquireMaterializationRebuildLock(String dbName, String 
tableName, long txnId)
+      throws LockException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index c8cafa2..623b037 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -213,4 +215,11 @@ abstract class HiveTxnManagerImpl implements 
HiveTxnManager, Configurable {
     return true;
   }
 
+  @Override
+  public LockResponse acquireMaterializationRebuildLock(String dbName, String 
tableName, long txnId)
+      throws LockException {
+    // This is default implementation. Locking only works for incremental 
maintenance
+    // which only works for DB transactional manager, thus we cannot acquire a 
lock.
+    return new LockResponse(0L, LockState.NOT_ACQUIRED);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 38fbb7b..2dd1d35 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -59,9 +59,22 @@ import java.util.stream.Collectors;
 
 import javax.jdo.JDODataStoreException;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileChecksum;
@@ -76,6 +89,8 @@ import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import 
org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
 import 
org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;
@@ -161,7 +176,9 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAugmentMaterializationRule;
 import 
org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -177,6 +194,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.TxnIdUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1316,14 +1334,11 @@ public class Hive {
    * @return the list of materialized views available for rewriting
    * @throws HiveException
    */
-  public List<RelOptMaterialization> getValidMaterializedViews(boolean 
materializedViewRebuild) throws HiveException {
-    final long defaultDiff =
-        HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW,
-            TimeUnit.MILLISECONDS);
-    final long currentTime = System.currentTimeMillis();
+  public List<RelOptMaterialization> getAllValidMaterializedViews(boolean 
forceMVContentsUpToDate, ValidTxnWriteIdList txnList)
+      throws HiveException {
+    // Final result
+    List<RelOptMaterialization> result = new ArrayList<>();
     try {
-      // Final result
-      List<RelOptMaterialization> result = new ArrayList<>();
       for (String dbName : getMSC().getAllDatabases()) {
         // From metastore (for security)
         List<String> materializedViewNames = 
getMaterializedViewsForRewriting(dbName);
@@ -1331,83 +1346,137 @@ public class Hive {
           // Bail out: empty list
           continue;
         }
-        List<Table> materializedViewTables = getTableObjects(dbName, 
materializedViewNames);
-        Map<String, Materialization> databaseInvalidationInfo =
-            getMSC().getMaterializationsInvalidationInfo(dbName, 
materializedViewNames);
-        for (Table materializedViewTable : materializedViewTables) {
+        result.addAll(getValidMaterializedViews(dbName, materializedViewNames, 
forceMVContentsUpToDate, txnList));
+      }
+      return result;
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public List<RelOptMaterialization> getValidMaterializedView(String dbName, 
String materializedViewName,
+      boolean forceMVContentsUpToDate, ValidTxnWriteIdList txnList) throws 
HiveException {
+    return getValidMaterializedViews(dbName, 
ImmutableList.of(materializedViewName), forceMVContentsUpToDate, txnList);
+  }
+
+  private List<RelOptMaterialization> getValidMaterializedViews(String dbName, 
List<String> materializedViewNames,
+      boolean forceMVContentsUpToDate, ValidTxnWriteIdList txnList) throws 
HiveException {
+    final boolean tryIncrementalRewriting =
+        HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_INCREMENTAL);
+    final long defaultDiff =
+        HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW,
+            TimeUnit.MILLISECONDS);
+    final long currentTime = System.currentTimeMillis();
+    try {
+      // Final result
+      List<RelOptMaterialization> result = new ArrayList<>();
+      List<Table> materializedViewTables = getTableObjects(dbName, 
materializedViewNames);
+      Map<String, Materialization> databaseInvalidationInfo =
+          getMSC().getMaterializationsInvalidationInfo(dbName, 
materializedViewNames);
+      for (Table materializedViewTable : materializedViewTables) {
+        // Check if materialization defined its own invalidation time window
+        String timeWindowString = 
materializedViewTable.getProperty(MATERIALIZED_VIEW_REWRITING_TIME_WINDOW);
+        long diff = 
org.apache.commons.lang.StringUtils.isEmpty(timeWindowString) ? defaultDiff :
+            HiveConf.toTime(timeWindowString,
+                
HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW),
+                TimeUnit.MILLISECONDS);
+        Materialization materializationInvInfo = null;
+        boolean outdated = false;
+        if (diff < 0L) {
+          // We only consider the materialized view to be outdated if 
forceOutdated = true, i.e.,
+          // if it is a rebuild. Otherwise, it passed the test and we use it 
as it is.
+          outdated = forceMVContentsUpToDate;
+        } else {
           // Check whether the materialized view is invalidated
-          Materialization materializationInvalidationInfo =
+          materializationInvInfo =
               
databaseInvalidationInfo.get(materializedViewTable.getTableName());
-          if (materializationInvalidationInfo == null) {
+          if (materializationInvInfo == null) {
             LOG.debug("Materialized view " + 
materializedViewTable.getFullyQualifiedName() +
                 " ignored for rewriting as there was no information loaded in 
the invalidation cache");
             continue;
           }
-          // Check if materialization defined its own invalidation time window
-          String timeWindowString = 
materializedViewTable.getProperty(MATERIALIZED_VIEW_REWRITING_TIME_WINDOW);
-          long diff = 
org.apache.commons.lang.StringUtils.isEmpty(timeWindowString) ? defaultDiff :
-              HiveConf.toTime(timeWindowString,
-                  
HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW),
-                  TimeUnit.MILLISECONDS);
-
-          long invalidationTime = 
materializationInvalidationInfo.getInvalidationTime();
+          long invalidationTime = materializationInvInfo.getInvalidationTime();
+          if (invalidationTime == Long.MIN_VALUE) {
+            LOG.debug("Materialized view " + 
materializedViewTable.getFullyQualifiedName() +
+                " ignored for rewriting as it contains non-transactional 
tables");
+            continue;
+          }
           // If the limit is not met, we do not add the materialized view.
           // If we are doing a rebuild, we do not consider outdated 
materialized views either.
-          if (diff == 0L || materializedViewRebuild) {
+          if (diff == 0L || forceMVContentsUpToDate) {
             if (invalidationTime != 0L) {
-              // If parameter is zero, materialized view cannot be outdated at 
all
-              LOG.debug("Materialized view " + 
materializedViewTable.getFullyQualifiedName() +
-                  " ignored for rewriting as its contents are outdated");
-              continue;
+              outdated = true;
             }
           } else {
-            if (invalidationTime != 0 && invalidationTime > currentTime - 
diff) {
-              LOG.debug("Materialized view " + 
materializedViewTable.getFullyQualifiedName() +
-                  " ignored for rewriting as its contents are outdated");
-              continue;
+            if (invalidationTime != 0L && invalidationTime > currentTime - 
diff) {
+              outdated = true;
             }
           }
+        }
 
-          // It passed the test, load
-          RelOptMaterialization materialization =
-              HiveMaterializedViewsRegistry.get().getRewritingMaterializedView(
-                  dbName, materializedViewTable.getTableName());
-          if (materialization != null) {
-            RelNode viewScan = materialization.tableRel;
-            RelOptHiveTable cachedMaterializedViewTable;
-            if (viewScan instanceof Project) {
-              // There is a Project on top (due to nullability)
-              cachedMaterializedViewTable = (RelOptHiveTable) 
viewScan.getInput(0).getTable();
-            } else {
-              cachedMaterializedViewTable = (RelOptHiveTable) 
viewScan.getTable();
-            }
-            if (cachedMaterializedViewTable.getHiveTableMD().getCreateTime() ==
-                materializedViewTable.getCreateTime()) {
-              // It is in the cache and up to date
-              result.add(materialization);
-              continue;
+        if (outdated && (!tryIncrementalRewriting || materializationInvInfo == 
null
+            || txnList == null || 
materializationInvInfo.isSourceTablesUpdateDeleteModified())) {
+          // We will not try partial rewriting either because the config 
specification, this
+          // is a rebuild over some non-transactional table, or there were 
update/delete
+          // operations in the source tables (not supported yet)
+          LOG.debug("Materialized view " + 
materializedViewTable.getFullyQualifiedName() +
+              " ignored for rewriting as its contents are outdated");
+          continue;
+        }
+
+        // It passed the test, load
+        RelOptMaterialization materialization =
+            HiveMaterializedViewsRegistry.get().getRewritingMaterializedView(
+                dbName, materializedViewTable.getTableName());
+        if (materialization != null) {
+          RelNode viewScan = materialization.tableRel;
+          RelOptHiveTable cachedMaterializedViewTable;
+          if (viewScan instanceof Project) {
+            // There is a Project on top (due to nullability)
+            cachedMaterializedViewTable = (RelOptHiveTable) 
viewScan.getInput(0).getTable();
+          } else {
+            cachedMaterializedViewTable = (RelOptHiveTable) 
viewScan.getTable();
+          }
+          if (cachedMaterializedViewTable.getHiveTableMD().getCreateTime() ==
+              materializedViewTable.getCreateTime()) {
+            // It is in the cache and up to date
+            if (outdated) {
+              // We will rewrite it to include the filters on transaction list
+              // so we can produce partial rewritings
+              materialization = augmentMaterializationWithTimeInformation(
+                  materialization, txnList, new ValidTxnWriteIdList(
+                      materializationInvInfo.getValidTxnList()));
             }
+            result.add(materialization);
+            continue;
           }
+        }
 
-          // It was not present in the cache (maybe because it was added by 
another HS2)
-          // or it is not up to date.
-          if (HiveMaterializedViewsRegistry.get().isInitialized()) {
-            // But the registry was fully initialized, thus we need to add it
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Materialized view " + 
materializedViewTable.getFullyQualifiedName() +
-                  " was not in the cache");
-            }
-            materialization = 
HiveMaterializedViewsRegistry.get().createMaterializedView(
-                conf, materializedViewTable);
-            if (materialization != null) {
-              result.add(materialization);
-            }
-          } else {
-            // Otherwise the registry has not been initialized, skip for the 
time being
-            if (LOG.isWarnEnabled()) {
-              LOG.info("Materialized view " + 
materializedViewTable.getFullyQualifiedName() + " was skipped "
-                  + "because cache has not been loaded yet");
+        // It was not present in the cache (maybe because it was added by 
another HS2)
+        // or it is not up to date.
+        if (HiveMaterializedViewsRegistry.get().isInitialized()) {
+          // But the registry was fully initialized, thus we need to add it
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Materialized view " + 
materializedViewTable.getFullyQualifiedName() +
+                " was not in the cache");
+          }
+          materialization = 
HiveMaterializedViewsRegistry.get().createMaterializedView(
+              conf, materializedViewTable);
+          if (materialization != null) {
+            if (outdated) {
+              // We will rewrite it to include the filters on transaction list
+              // so we can produce partial rewritings
+              materialization = augmentMaterializationWithTimeInformation(
+                  materialization, txnList, new ValidTxnWriteIdList(
+                      materializationInvInfo.getValidTxnList()));
             }
+            result.add(materialization);
+          }
+        } else {
+          // Otherwise the registry has not been initialized, skip for the 
time being
+          if (LOG.isWarnEnabled()) {
+            LOG.info("Materialized view " + 
materializedViewTable.getFullyQualifiedName() + " was skipped "
+                + "because cache has not been loaded yet");
           }
         }
       }
@@ -1418,6 +1487,24 @@ public class Hive {
   }
 
   /**
+   * Method to enrich the materialization query contained in the input with
+   * its invalidation.
+   */
+  private static RelOptMaterialization 
augmentMaterializationWithTimeInformation(
+      RelOptMaterialization materialization, ValidTxnWriteIdList 
currentTxnList,
+      ValidTxnWriteIdList materializationTxnList) {
+    final RexBuilder rexBuilder = 
materialization.queryRel.getCluster().getRexBuilder();
+    final HepProgramBuilder augmentMaterializationProgram = new 
HepProgramBuilder()
+        .addRuleInstance(new HiveAugmentMaterializationRule(rexBuilder, 
currentTxnList, materializationTxnList));
+    final HepPlanner augmentMaterializationPlanner = new HepPlanner(
+        augmentMaterializationProgram.build());
+    augmentMaterializationPlanner.setRoot(materialization.queryRel);
+    final RelNode modifiedQueryRel = 
augmentMaterializationPlanner.findBestExp();
+    return new RelOptMaterialization(materialization.tableRel, 
modifiedQueryRel,
+        null, materialization.qualifiedTableName);
+  }
+
+  /**
    * Get materialized views for the specified database that have enabled 
rewriting.
    * @param dbName
    * @return List of materialized view table objects
@@ -2424,8 +2511,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          created
    * @param partPath the path where the partition data is located
    * @param inheritTableSpecs whether to copy over the table specs for 
if/of/serde
-   * @param newFiles An optional list of new files that were moved into this 
partition.  If
-   *                 non-null these will be included in the DML event sent to 
the metastore.
    * @return result partition object or null if there is no partition
    * @throws HiveException
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
index 53dc8ec..6e585e5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
@@ -193,6 +193,8 @@ public final class HiveMaterializedViewsRegistry {
   private RelOptMaterialization addMaterializedView(HiveConf conf, Table 
materializedViewTable, OpType opType) {
     // Bail out if it is not enabled for rewriting
     if (!materializedViewTable.isRewriteEnabled()) {
+      LOG.debug("Materialized view " + materializedViewTable.getCompleteName() 
+
+          " ignored; it is not rewrite enabled");
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAggregateIncrementalRewritingRule.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAggregateIncrementalRewritingRule.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAggregateIncrementalRewritingRule.java
new file mode 100644
index 0000000..aabd75e
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAggregateIncrementalRewritingRule.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import 
org.apache.hadoop.hive.ql.optimizer.calcite.functions.HiveSqlMinMaxAggFunction;
+import 
org.apache.hadoop.hive.ql.optimizer.calcite.functions.HiveSqlSumAggFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This rule will perform a rewriting to prepare the plan for incremental
+ * view maintenance in case there exist aggregation operator, so we can
+ * avoid the INSERT OVERWRITE and use a MERGE statement instead.
+ *
+ * In particular, the INSERT OVERWRITE maintenance will look like this
+ * (in SQL):
+ * INSERT OVERWRITE mv
+ * SELECT a, b, SUM(s) as s, SUM(c) AS c
+ * FROM (
+ *   SELECT * from mv --OLD DATA
+ *   UNION ALL
+ *   SELECT a, b, SUM(x) AS s, COUNT(*) AS c --NEW DATA
+ *   FROM TAB_A
+ *   JOIN TAB_B ON (TAB_A.a = TAB_B.z)
+ *   WHERE TAB_A.ROW_ID > 5
+ *   GROUP BY a, b) inner_subq
+ * GROUP BY a, b;
+ *
+ * We need to transform that into:
+ * MERGE INTO mv
+ * USING (
+ *   SELECT a, b, SUM(x) AS s, COUNT(*) AS c --NEW DATA
+ *   FROM TAB_A
+ *   JOIN TAB_B ON (TAB_A.a = TAB_B.z)
+ *   WHERE TAB_A.ROW_ID > 5
+ *   GROUP BY a, b) source
+ * ON (mv.a = source.a AND mv.b = source.b)
+ * WHEN MATCHED AND mv.c + source.c <> 0
+ *   THEN UPDATE SET mv.s = mv.s + source.s, mv.c = mv.c + source.c
+ * WHEN NOT MATCHED
+ *   THEN INSERT VALUES (source.a, source.b, s, c);
+ *
+ * To be precise, we need to convert it into a MERGE rewritten as:
+ * FROM mv right outer join _source_ source
+ * ON (mv.a = source.a AND mv.b = source.b)
+ * INSERT INTO TABLE mv
+ *   SELECT source.a, source.b, s, c
+ *   WHERE mv.a IS NULL AND mv2.b IS NULL
+ * INSERT INTO TABLE mv
+ *   SELECT mv.ROW__ID, source.a, source.b, mv.s + source.s, mv.c + source.c
+ *   WHERE source.a=mv.a AND source.b=mv.b
+ *   SORT BY mv.ROW__ID;
+ */
+public class HiveAggregateIncrementalRewritingRule extends RelOptRule {
+
+  public static final HiveAggregateIncrementalRewritingRule INSTANCE =
+      new HiveAggregateIncrementalRewritingRule();
+
+  private HiveAggregateIncrementalRewritingRule() {
+    super(operand(Aggregate.class, operand(Union.class, any())),
+        HiveRelFactories.HIVE_BUILDER,
+        "HiveAggregateIncrementalRewritingRule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate agg = call.rel(0);
+    final Union union = call.rel(1);
+    final RexBuilder rexBuilder =
+        agg.getCluster().getRexBuilder();
+    // 1) First branch is query, second branch is MV
+    final RelNode joinLeftInput = union.getInput(1);
+    final RelNode joinRightInput = union.getInput(0);
+    // 2) Build conditions for join and filter and start adding
+    // expressions for project operator
+    List<RexNode> projExprs = new ArrayList<>();
+    List<RexNode> joinConjs = new ArrayList<>();
+    List<RexNode> filterConjs = new ArrayList<>();
+    int groupCount = agg.getGroupCount();
+    int totalCount = agg.getGroupCount() + agg.getAggCallList().size();
+    for (int leftPos = 0, rightPos = totalCount;
+         leftPos < groupCount; leftPos++, rightPos++) {
+      RexNode leftRef = rexBuilder.makeInputRef(
+          joinLeftInput.getRowType().getFieldList().get(leftPos).getType(), 
leftPos);
+      RexNode rightRef = rexBuilder.makeInputRef(
+          joinRightInput.getRowType().getFieldList().get(leftPos).getType(), 
rightPos);
+      projExprs.add(rightRef);
+      joinConjs.add(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+          ImmutableList.of(leftRef, rightRef)));
+      filterConjs.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL,
+          ImmutableList.of(leftRef)));
+    }
+    // 3) Add the expressions that correspond to the aggregation
+    // functions
+    RexNode caseFilterCond = rexBuilder.makeCall(SqlStdOperatorTable.AND, 
filterConjs);
+    for (int i = 0, leftPos = groupCount, rightPos = totalCount + groupCount;
+         leftPos < totalCount; i++, leftPos++, rightPos++) {
+      // case when mv2.deptno IS NULL AND mv2.deptname IS NULL then s else 
source.s + mv2.s end
+      RexNode leftRef = rexBuilder.makeInputRef(
+          joinLeftInput.getRowType().getFieldList().get(leftPos).getType(), 
leftPos);
+      RexNode rightRef = rexBuilder.makeInputRef(
+          joinRightInput.getRowType().getFieldList().get(leftPos).getType(), 
rightPos);
+      // Generate SQLOperator for merging the aggregations
+      RexNode elseReturn;
+      SqlAggFunction aggCall = agg.getAggCallList().get(i).getAggregation();
+      switch (aggCall.getKind()) {
+      case SUM:
+        // SUM and COUNT are rolled up as SUM, hence SUM represents both here
+        elseReturn = rexBuilder.makeCall(SqlStdOperatorTable.PLUS,
+            ImmutableList.of(rightRef, leftRef));
+        break;
+      case MIN: {
+        RexNode condInnerCase = 
rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN,
+            ImmutableList.of(rightRef, leftRef));
+        elseReturn = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+            ImmutableList.of(condInnerCase, rightRef, leftRef));
+        }
+        break;
+      case MAX: {
+        RexNode condInnerCase = 
rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN,
+            ImmutableList.of(rightRef, leftRef));
+        elseReturn = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+            ImmutableList.of(condInnerCase, rightRef, leftRef));
+        }
+        break;
+      default:
+        throw new AssertionError("Found an aggregation that could not be"
+            + " recognized: " + aggCall);
+      }
+      projExprs.add(rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+          ImmutableList.of(caseFilterCond, rightRef, elseReturn)));
+    }
+    RexNode joinCond = rexBuilder.makeCall(SqlStdOperatorTable.AND, joinConjs);
+    RexNode filterCond = rexBuilder.makeCall(SqlStdOperatorTable.AND, 
filterConjs);
+    // 3) Build plan
+    RelNode newNode = call.builder()
+        .push(union.getInput(1))
+        .push(union.getInput(0))
+        .join(JoinRelType.RIGHT, joinCond)
+        .filter(rexBuilder.makeCall(SqlStdOperatorTable.OR, 
ImmutableList.of(joinCond, filterCond)))
+        .project(projExprs)
+        .build();
+    call.transformTo(newNode);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentMaterializationRule.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentMaterializationRule.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentMaterializationRule.java
new file mode 100644
index 0000000..420e6c4
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentMaterializationRule.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hive.common.util.TxnIdUtils;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This rule will rewrite the materialized view with information about
+ * its invalidation data. In particular, if any of the tables used by the
+ * materialization has been updated since the materialization was created,
+ * it will introduce a filter operator on top of that table in the 
materialization
+ * definition, making explicit the data contained in it so the rewriting
+ * algorithm can use this information to rewrite the query as a combination of 
the
+ * outdated materialization data and the new original data in the source 
tables.
+ * If the data in the source table matches the current data in the snapshot,
+ * no filter is created.
+ */
+public class HiveAugmentMaterializationRule extends RelOptRule {
+
+  private final RexBuilder rexBuilder;
+  private final ValidTxnWriteIdList currentTxnList;
+  private final ValidTxnWriteIdList materializationTxnList;
+  private final Set<RelNode> visited;
+
+  public HiveAugmentMaterializationRule(RexBuilder rexBuilder,
+      ValidTxnWriteIdList currentTxnList, ValidTxnWriteIdList 
materializationTxnList) {
+    super(operand(TableScan.class, any()),
+        HiveRelFactories.HIVE_BUILDER, "HiveAugmentMaterializationRule");
+    this.rexBuilder = rexBuilder;
+    this.currentTxnList = currentTxnList;
+    this.materializationTxnList = materializationTxnList;
+    this.visited = new HashSet<>();
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final TableScan tableScan = call.rel(0);
+    if (!visited.add(tableScan)) {
+      // Already visited
+      return;
+    }
+    final String tableQName =
+        
((RelOptHiveTable)tableScan.getTable()).getHiveTableMD().getFullyQualifiedName();
+    final ValidWriteIdList tableCurrentTxnList =
+        currentTxnList.getTableValidWriteIdList(tableQName);
+    final ValidWriteIdList tableMaterializationTxnList =
+        materializationTxnList.getTableValidWriteIdList(tableQName);
+    if (TxnIdUtils.checkEquivalentWriteIds(tableCurrentTxnList, 
tableMaterializationTxnList)) {
+      // This table has not been modified since materialization was created,
+      // nothing to do
+      return;
+    }
+    // ROW__ID: struct<transactionid:bigint,bucketid:int,rowid:bigint>
+    int rowIDPos = tableScan.getTable().getRowType().getField(
+        VirtualColumn.ROWID.getName(), false, false).getIndex();
+    RexNode rowIDFieldAccess = rexBuilder.makeFieldAccess(
+        
rexBuilder.makeInputRef(tableScan.getTable().getRowType().getFieldList().get(rowIDPos).getType(),
 rowIDPos),
+        0);
+    // Now we create the filter with the transactions information.
+    // In particular, each table in the materialization will only have 
contents such that:
+    // ROW_ID.writeid <= high_watermark and ROW_ID.writeid not in 
(open/invalid_ids)
+    // Hence, we add that condition on top of the source table.
+    // The rewriting will then have the possibility to create partial 
rewritings that read
+    // the materialization and the source tables, and hence, produce an 
incremental
+    // rebuild that is more efficient than the full rebuild.
+    final RelBuilder relBuilder = call.builder();
+    relBuilder.push(tableScan);
+    List<RexNode> conds = new ArrayList<>();
+    RelDataType bigIntType = 
relBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+    final RexNode literalHighWatermark = rexBuilder.makeLiteral(
+        tableMaterializationTxnList.getHighWatermark(), bigIntType, false);
+    conds.add(
+        rexBuilder.makeCall(
+            SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+            ImmutableList.of(rowIDFieldAccess, literalHighWatermark)));
+    for (long invalidTxn : tableMaterializationTxnList.getInvalidWriteIds()) {
+      final RexNode literalInvalidTxn = rexBuilder.makeLiteral(
+          invalidTxn, bigIntType, false);
+      conds.add(
+          rexBuilder.makeCall(
+              SqlStdOperatorTable.NOT_EQUALS,
+              ImmutableList.of(rowIDFieldAccess, literalInvalidTxn)));
+    }
+    relBuilder.filter(conds);
+    call.transformTo(relBuilder.build());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveNoAggregateIncrementalRewritingRule.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveNoAggregateIncrementalRewritingRule.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveNoAggregateIncrementalRewritingRule.java
new file mode 100644
index 0000000..ea285d0
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveNoAggregateIncrementalRewritingRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+
+/**
+ * This rule will perform a rewriting to prepare the plan for incremental
+ * view maintenance in case there is no aggregation operator, so we can
+ * avoid the INSERT OVERWRITE and use a INSERT statement instead.
+ * In particular, it removes the union branch that reads the old data from
+ * the materialization, and keeps the branch that will read the new data.
+ */
+public class HiveNoAggregateIncrementalRewritingRule extends RelOptRule {
+
+  public static final HiveNoAggregateIncrementalRewritingRule INSTANCE =
+      new HiveNoAggregateIncrementalRewritingRule();
+
+  private HiveNoAggregateIncrementalRewritingRule() {
+    super(operand(Union.class, any()),
+        HiveRelFactories.HIVE_BUILDER, 
"HiveNoAggregateIncrementalRewritingRule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Union union = call.rel(0);
+    // First branch is query, second branch is MV
+    RelNode newNode = call.builder()
+        .push(union.getInput(0))
+        .convert(union.getRowType(), false)
+        .build();
+    call.transformTo(newNode);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewRewritingRelVisitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewRewritingRelVisitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewRewritingRelVisitor.java
new file mode 100644
index 0000000..884efd5
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/MaterializedViewRewritingRelVisitor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.util.ControlFlowException;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is a helper to check whether a materialized view rebuild
+ * can be transformed from INSERT OVERWRITE to INSERT INTO.
+ *
+ * We are verifying that:
+ *   1) the rewriting is rooted by legal operators (Filter and Project)
+ *   before reaching a Union operator,
+ *   2) the left branch uses the MV that we are trying to rebuild and
+ *   legal operators (Filter and Project), and
+ *   3) the right branch only uses legal operators (i.e., Filter, Project,
+ *   Join, and TableScan)
+ */
+public class MaterializedViewRewritingRelVisitor extends RelVisitor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MaterializedViewRewritingRelVisitor.class);
+
+
+  private boolean containsAggregate;
+  private boolean rewritingAllowed;
+
+  public MaterializedViewRewritingRelVisitor() {
+    this.containsAggregate = false;
+    this.rewritingAllowed = false;
+  }
+
+  @Override
+  public void visit(RelNode node, int ordinal, RelNode parent) {
+    if (node instanceof Aggregate) {
+      this.containsAggregate = true;
+      // Aggregate mode - it should be followed by union
+      // that we need to analyze
+      RelNode input = node.getInput(0);
+      if (input instanceof Union) {
+        check((Union) input);
+      }
+    } else if (node instanceof Union) {
+      // Non aggregate mode - analyze union operator
+      check((Union) node);
+    } else if (node instanceof Project) {
+      // Project operator, we can continue
+      super.visit(node, ordinal, parent);
+    }
+    throw new ReturnedValue(false);
+  }
+
+  private void check(Union union) {
+    // We found the Union
+    if (union.getInputs().size() != 2) {
+      // Bail out
+      throw new ReturnedValue(false);
+    }
+    // First branch should have the query (with write ID filter conditions)
+    new RelVisitor() {
+      @Override
+      public void visit(RelNode node, int ordinal, RelNode parent) {
+        if (node instanceof TableScan ||
+            node instanceof Filter ||
+            node instanceof Project ||
+            node instanceof Join) {
+          // We can continue
+          super.visit(node, ordinal, parent);
+        } else if (node instanceof Aggregate && containsAggregate) {
+          // We can continue
+          super.visit(node, ordinal, parent);
+        } else {
+          throw new ReturnedValue(false);
+        }
+      }
+    }.go(union.getInput(0));
+    // Second branch should only have the MV
+    new RelVisitor() {
+      @Override
+      public void visit(RelNode node, int ordinal, RelNode parent) {
+        if (node instanceof TableScan) {
+          // We can continue
+          // TODO: Need to check that this is the same MV that we are 
rebuilding
+          RelOptHiveTable hiveTable = (RelOptHiveTable) node.getTable();
+          if (!hiveTable.getHiveTableMD().isMaterializedView()) {
+            // If it is not a materialized view, we do not rewrite it
+            throw new ReturnedValue(false);
+          }
+          if (containsAggregate && 
!AcidUtils.isFullAcidTable(hiveTable.getHiveTableMD())) {
+            // If it contains an aggregate and it is not a full acid table,
+            // we do not rewrite it (we need MERGE support)
+            throw new ReturnedValue(false);
+          }
+        } else if (node instanceof Project) {
+          // We can continue
+          super.visit(node, ordinal, parent);
+        } else {
+          throw new ReturnedValue(false);
+        }
+      }
+    }.go(union.getInput(1));
+    // We pass all the checks, we can rewrite
+    throw new ReturnedValue(true);
+  }
+
+  /**
+   * Starts an iteration.
+   */
+  public RelNode go(RelNode p) {
+    try {
+      visit(p, 0, null);
+    } catch (ReturnedValue e) {
+      // Rewriting cannot be performed
+      rewritingAllowed = e.value;
+    }
+    return p;
+  }
+
+  public boolean isContainsAggregate() {
+    return containsAggregate;
+  }
+
+  public boolean isRewritingAllowed() {
+    return rewritingAllowed;
+  }
+
+
+  /**
+   * Exception used to interrupt a visitor walk.
+   */
+  private static class ReturnedValue extends ControlFlowException {
+    private final boolean value;
+
+    public ReturnedValue(boolean value) {
+      this.value = value;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c695c70b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 41de17f..7a7bdea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -72,6 +72,7 @@ import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Filter;
@@ -122,11 +123,15 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ObjectPair;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -221,7 +226,10 @@ import 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSubQueryRemoveRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveUnionMergeRule;
 import 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveUnionPullUpConstantsRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveWindowingFixRule;
+import 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAggregateIncrementalRewritingRule;
 import 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewRule;
+import 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveNoAggregateIncrementalRewritingRule;
+import 
org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.MaterializedViewRewritingRelVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTBuilder;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter;
@@ -407,7 +415,6 @@ public class CalcitePlanner extends SemanticAnalyzer {
         disableJoinMerge = true;
         boolean reAnalyzeAST = false;
         final boolean materializedView = getQB().isMaterializedView();
-        final boolean rebuild = materializedView && createVwDesc.isReplace();
 
         try {
           if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) 
{
@@ -427,9 +434,13 @@ public class CalcitePlanner extends SemanticAnalyzer {
             ASTNode newAST = getOptimizedAST();
 
             // 1.1. Fix up the query for insert/ctas/materialized views
-            if (!rebuild) {
-              // If it is not a MATERIALIZED VIEW...REBUILD
-              newAST = fixUpAfterCbo(ast, newAST, cboCtx);
+            newAST = fixUpAfterCbo(ast, newAST, cboCtx);
+
+            // 1.2. Fix up the query for materialization rebuild
+            if (mvRebuildMode == MaterializationRebuildMode.AGGREGATE_REBUILD) 
{
+              fixUpASTAggregateIncrementalRebuild(newAST);
+            } else if (mvRebuildMode == 
MaterializationRebuildMode.NO_AGGREGATE_REBUILD) {
+              fixUpASTNoAggregateIncrementalRebuild(newAST);
             }
 
             // 2. Regen OP plan from optimized AST
@@ -440,29 +451,22 @@ public class CalcitePlanner extends SemanticAnalyzer {
                 throw new CalciteViewSemanticException(e.getMessage());
               }
             } else if (cboCtx.type == PreCboCtx.Type.VIEW && materializedView) 
{
-              if (rebuild) {
-                // Use the CREATE MATERIALIZED VIEW...REBUILD
-                init(false);
-                setAST(ast);
-                reAnalyzeViewAfterCbo(ast);
-              } else {
-                // Store text of the ORIGINAL QUERY
-                String originalText = ctx.getTokenRewriteStream().toString(
-                    cboCtx.nodeOfInterest.getTokenStartIndex(),
-                    cboCtx.nodeOfInterest.getTokenStopIndex());
-                
unparseTranslator.applyTranslations(ctx.getTokenRewriteStream());
-                String expandedText = ctx.getTokenRewriteStream().toString(
-                    cboCtx.nodeOfInterest.getTokenStartIndex(),
-                    cboCtx.nodeOfInterest.getTokenStopIndex());
-                // Redo create-table/view analysis, because it's not part of
-                // doPhase1.
-                // Use the REWRITTEN AST
-                init(false);
-                setAST(newAST);
-                newAST = reAnalyzeViewAfterCbo(newAST);
-                createVwDesc.setViewOriginalText(originalText);
-                createVwDesc.setViewExpandedText(expandedText);
-              }
+              // Store text of the ORIGINAL QUERY
+              String originalText = ctx.getTokenRewriteStream().toString(
+                  cboCtx.nodeOfInterest.getTokenStartIndex(),
+                  cboCtx.nodeOfInterest.getTokenStopIndex());
+              unparseTranslator.applyTranslations(ctx.getTokenRewriteStream());
+              String expandedText = ctx.getTokenRewriteStream().toString(
+                  cboCtx.nodeOfInterest.getTokenStartIndex(),
+                  cboCtx.nodeOfInterest.getTokenStopIndex());
+              // Redo create-table/view analysis, because it's not part of
+              // doPhase1.
+              // Use the REWRITTEN AST
+              init(false);
+              setAST(newAST);
+              newAST = reAnalyzeViewAfterCbo(newAST);
+              createVwDesc.setViewOriginalText(originalText);
+              createVwDesc.setViewExpandedText(expandedText);
               viewSelect = newAST;
               viewsExpanded = new ArrayList<>();
               viewsExpanded.add(createVwDesc.getViewName());
@@ -963,6 +967,209 @@ public class CalcitePlanner extends SemanticAnalyzer {
     return table;
   }
 
+  private void fixUpASTAggregateIncrementalRebuild(ASTNode newAST) throws 
SemanticException {
+    // Replace INSERT OVERWRITE by MERGE equivalent rewriting.
+    // Here we need to do this complex AST rewriting that generates the same 
plan
+    // that a MERGE clause would generate because CBO does not support MERGE 
yet.
+    // TODO: Support MERGE as first class member in CBO to simplify this logic.
+    // 1) Replace INSERT OVERWRITE by INSERT
+    ASTNode updateNode = new ASTSearcher().simpleBreadthFirstSearch(
+        newAST, HiveParser.TOK_QUERY, HiveParser.TOK_INSERT);
+    ASTNode destinationNode = (ASTNode) updateNode.getChild(0);
+    ASTNode newInsertInto = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.TOK_INSERT_INTO, "TOK_INSERT_INTO");
+    newInsertInto.addChildren(destinationNode.getChildren());
+    ASTNode destinationParentNode = (ASTNode) destinationNode.getParent();
+    int childIndex = destinationNode.childIndex;
+    destinationParentNode.deleteChild(childIndex);
+    destinationParentNode.insertChild(childIndex, newInsertInto);
+    // 1.1) Extract name as we will need it afterwards:
+    // TOK_DESTINATION TOK_TAB TOK_TABNAME <materialization_name>
+    ASTNode materializationNode = new ASTSearcher().simpleBreadthFirstSearch(
+        newInsertInto, HiveParser.TOK_INSERT_INTO, HiveParser.TOK_TAB, 
HiveParser.TOK_TABNAME);
+    // 2) Copy INSERT branch and duplicate it, the first branch will be the 
UPDATE
+    // for the MERGE statement while the new branch will be the INSERT for the
+    // MERGE statement
+    ASTNode updateParent = (ASTNode) updateNode.getParent();
+    ASTNode insertNode = (ASTNode) ParseDriver.adaptor.dupTree(updateNode);
+    insertNode.setParent(updateParent);
+    updateParent.addChild(insertNode);
+    // 3) Create ROW_ID column in select clause from left input for the RIGHT 
OUTER JOIN.
+    // This is needed for the UPDATE clause. Hence, we find the following node:
+    // TOK_QUERY
+    //   TOK_FROM
+    //      TOK_RIGHTOUTERJOIN
+    //         TOK_SUBQUERY
+    //            TOK_QUERY
+    //               ...
+    //               TOK_INSERT
+    //                  ...
+    //                  TOK_SELECT
+    // And then we create the following child node:
+    // TOK_SELEXPR
+    //    .
+    //       TOK_TABLE_OR_COL
+    //          cmv_mat_view
+    //       ROW__ID
+    ASTNode subqueryNodeInputROJ = new ASTSearcher().simpleBreadthFirstSearch(
+        newAST, HiveParser.TOK_QUERY, HiveParser.TOK_FROM, 
HiveParser.TOK_RIGHTOUTERJOIN,
+        HiveParser.TOK_SUBQUERY);
+    ASTNode selectNodeInputROJ = new ASTSearcher().simpleBreadthFirstSearch(
+        subqueryNodeInputROJ, HiveParser.TOK_SUBQUERY, HiveParser.TOK_QUERY,
+        HiveParser.TOK_INSERT, HiveParser.TOK_SELECT);
+    ASTNode selectExprNodeInputROJ = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.TOK_SELEXPR, "TOK_SELEXPR");
+    ASTNode dotNodeInputROJ = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.DOT, ".");
+    ASTNode columnTokNodeInputROJ = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.TOK_TABLE_OR_COL, "TOK_TABLE_OR_COL");
+    ASTNode tableNameNodeInputROJ = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.Identifier, Warehouse.getQualifiedName(
+            materializationNode.getChild(0).getText(),
+            materializationNode.getChild(1).getText()));
+    ASTNode rowIdNodeInputROJ = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.Identifier, VirtualColumn.ROWID.getName());
+    ParseDriver.adaptor.addChild(selectNodeInputROJ, selectExprNodeInputROJ);
+    ParseDriver.adaptor.addChild(selectExprNodeInputROJ, dotNodeInputROJ);
+    ParseDriver.adaptor.addChild(dotNodeInputROJ, columnTokNodeInputROJ);
+    ParseDriver.adaptor.addChild(dotNodeInputROJ, rowIdNodeInputROJ);
+    ParseDriver.adaptor.addChild(columnTokNodeInputROJ, tableNameNodeInputROJ);
+    // 4) Transform first INSERT branch into an UPDATE
+    // 4.1) Adding ROW__ID field
+    ASTNode selectNodeInUpdate = (ASTNode) updateNode.getChild(1);
+    if (selectNodeInUpdate.getType() != HiveParser.TOK_SELECT) {
+      throw new SemanticException("TOK_SELECT expected in incremental 
rewriting");
+    }
+    ASTNode selectExprNodeInUpdate = (ASTNode) 
ParseDriver.adaptor.dupNode(selectExprNodeInputROJ);
+    ASTNode dotNodeInUpdate = (ASTNode) 
ParseDriver.adaptor.dupNode(dotNodeInputROJ);
+    ASTNode columnTokNodeInUpdate = (ASTNode) 
ParseDriver.adaptor.dupNode(columnTokNodeInputROJ);
+    ASTNode tableNameNodeInUpdate = (ASTNode) 
ParseDriver.adaptor.dupNode(subqueryNodeInputROJ.getChild(1));
+    ASTNode rowIdNodeInUpdate = (ASTNode) 
ParseDriver.adaptor.dupNode(rowIdNodeInputROJ);
+    ParseDriver.adaptor.addChild(selectExprNodeInUpdate, dotNodeInUpdate);
+    ParseDriver.adaptor.addChild(dotNodeInUpdate, columnTokNodeInUpdate);
+    ParseDriver.adaptor.addChild(dotNodeInUpdate, rowIdNodeInUpdate);
+    ParseDriver.adaptor.addChild(columnTokNodeInUpdate, tableNameNodeInUpdate);
+    selectNodeInUpdate.insertChild(0, 
ParseDriver.adaptor.dupTree(selectExprNodeInUpdate));
+    // 4.2) Modifying filter condition. The incremental rewriting rule 
generated an OR
+    // clause where first disjunct contains the condition for the UPDATE 
branch.
+    // TOK_WHERE
+    //   or
+    //      and <- DISJUNCT FOR <UPDATE>
+    //         =
+    //            .
+    //               TOK_TABLE_OR_COL
+    //                  $hdt$_0
+    //               a
+    //            .
+    //               TOK_TABLE_OR_COL
+    //                  $hdt$_1
+    //               a
+    //         =
+    //            .
+    //               TOK_TABLE_OR_COL
+    //                  $hdt$_0
+    //               c
+    //            .
+    //               TOK_TABLE_OR_COL
+    //                  $hdt$_1
+    //               c
+    //      and <- DISJUNCT FOR <INSERT>
+    //         TOK_FUNCTION
+    //            isnull
+    //            .
+    //               TOK_TABLE_OR_COL
+    //                  $hdt$_0
+    //               a
+    //         TOK_FUNCTION
+    //            isnull
+    //            .
+    //               TOK_TABLE_OR_COL
+    //                  $hdt$_0
+    //               c
+    ASTNode whereClauseInUpdate = null;
+    for (int i = 0; i < updateNode.getChildren().size(); i++) {
+      if (updateNode.getChild(i).getType() == HiveParser.TOK_WHERE) {
+        whereClauseInUpdate = (ASTNode) updateNode.getChild(i);
+        break;
+      }
+    }
+    if (whereClauseInUpdate == null) {
+      throw new SemanticException("TOK_WHERE expected in incremental 
rewriting");
+    }
+    if (whereClauseInUpdate.getChild(0).getType() != HiveParser.KW_OR) {
+      throw new SemanticException("OR clause expected below TOK_WHERE in 
incremental rewriting");
+    }
+    // We bypass the OR clause and select the first disjunct
+    ASTNode newCondInUpdate = (ASTNode) 
whereClauseInUpdate.getChild(0).getChild(0);
+    ParseDriver.adaptor.setChild(whereClauseInUpdate, 0, newCondInUpdate);
+    // 4.3) Finally, we add SORT clause, this is needed for the UPDATE.
+    //       TOK_SORTBY
+    //         TOK_TABSORTCOLNAMEASC
+    //            TOK_NULLS_FIRST
+    //               .
+    //                  TOK_TABLE_OR_COL
+    //                     cmv_basetable_2
+    //                  ROW__ID
+    ASTNode sortExprNode = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.TOK_SORTBY, "TOK_SORTBY");
+    ASTNode orderExprNode = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.TOK_TABSORTCOLNAMEASC, "TOK_TABSORTCOLNAMEASC");
+    ASTNode nullsOrderExprNode = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.TOK_NULLS_FIRST, "TOK_NULLS_FIRST");
+    ASTNode dotNodeInSort = (ASTNode) 
ParseDriver.adaptor.dupTree(dotNodeInUpdate);
+    ParseDriver.adaptor.addChild(updateNode, sortExprNode);
+    ParseDriver.adaptor.addChild(sortExprNode, orderExprNode);
+    ParseDriver.adaptor.addChild(orderExprNode, nullsOrderExprNode);
+    ParseDriver.adaptor.addChild(nullsOrderExprNode, dotNodeInSort);
+    // 5) Modify INSERT branch condition. In particular, we need to modify the
+    // WHERE clause and pick up the second disjunct from the OR operation.
+    ASTNode whereClauseInInsert = null;
+    for (int i = 0; i < insertNode.getChildren().size(); i++) {
+      if (insertNode.getChild(i).getType() == HiveParser.TOK_WHERE) {
+        whereClauseInInsert = (ASTNode) insertNode.getChild(i);
+        break;
+      }
+    }
+    if (whereClauseInInsert == null) {
+      throw new SemanticException("TOK_WHERE expected in incremental 
rewriting");
+    }
+    if (whereClauseInInsert.getChild(0).getType() != HiveParser.KW_OR) {
+      throw new SemanticException("OR clause expected below TOK_WHERE in 
incremental rewriting");
+    }
+    // We bypass the OR clause and select the second disjunct
+    ASTNode newCondInInsert = (ASTNode) 
whereClauseInInsert.getChild(0).getChild(1);
+    ParseDriver.adaptor.setChild(whereClauseInInsert, 0, newCondInInsert);
+    // 6) Now we set some tree properties related to multi-insert
+    // operation with INSERT/UPDATE
+    ctx.setOperation(Context.Operation.MERGE);
+    ctx.addDestNamePrefix(1, Context.DestClausePrefix.UPDATE);
+    ctx.addDestNamePrefix(2, Context.DestClausePrefix.INSERT);
+  }
+
+  private void fixUpASTNoAggregateIncrementalRebuild(ASTNode newAST) throws 
SemanticException {
+    // Replace INSERT OVERWRITE by INSERT INTO
+    // AST tree will have this shape:
+    // TOK_QUERY
+    //   TOK_FROM
+    //      ...
+    //   TOK_INSERT
+    //      TOK_DESTINATION <- THIS TOKEN IS REPLACED BY 'TOK_INSERT_INTO'
+    //         TOK_TAB
+    //            TOK_TABNAME
+    //               default.cmv_mat_view
+    //      TOK_SELECT
+    //         ...
+    ASTNode dest = new ASTSearcher().simpleBreadthFirstSearch(newAST, 
HiveParser.TOK_QUERY,
+        HiveParser.TOK_INSERT, HiveParser.TOK_DESTINATION);
+    ASTNode newChild = (ASTNode) ParseDriver.adaptor.create(
+        HiveParser.TOK_INSERT_INTO, "TOK_INSERT_INTO");
+    newChild.addChildren(dest.getChildren());
+    ASTNode destParent = (ASTNode) dest.getParent();
+    int childIndex = dest.childIndex;
+    destParent.deleteChild(childIndex);
+    destParent.insertChild(childIndex, newChild);
+  }
+
   @Override
   String fixCtasColumnName(String colName) {
     if (runCBO) {
@@ -1847,7 +2054,36 @@ public class CalcitePlanner extends SemanticAnalyzer {
       // Add views to planner
       List<RelOptMaterialization> materializations = new ArrayList<>();
       try {
-        materializations = 
Hive.get().getValidMaterializedViews(rewrittenRebuild);
+        // Extract tables used by the query which will in turn be used to 
generate
+        // the corresponding txn write ids
+        List<String> tablesUsed = new ArrayList<>();
+        new RelVisitor() {
+          @Override
+          public void visit(RelNode node, int ordinal, RelNode parent) {
+            if (node instanceof TableScan) {
+              TableScan ts = (TableScan) node;
+              tablesUsed.add(((RelOptHiveTable) 
ts.getTable()).getHiveTableMD().getFullyQualifiedName());
+            }
+            super.visit(node, ordinal, parent);
+          }
+        }.go(basePlan);
+        final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY);
+        ValidTxnWriteIdList txnWriteIds = null;
+        if (validTxnsList != null && !validTxnsList.isEmpty()) {
+          txnWriteIds = 
SessionState.get().getTxnMgr().getValidWriteIds(tablesUsed, validTxnsList);
+        }
+        if (mvRebuildMode != MaterializationRebuildMode.NONE) {
+          // We only retrieve the materialization corresponding to the 
rebuild. In turn,
+          // we pass 'true' for the forceMVContentsUpToDate parameter, as we 
cannot allow the
+          // materialization contents to be stale for a rebuild if we want to 
use it.
+          materializations = 
Hive.get().getValidMaterializedView(mvRebuildDbName, mvRebuildName,
+              true, txnWriteIds);
+        } else {
+          // This is not a rebuild, we retrieve all the materializations. In 
turn, we do not need
+          // to force the materialization contents to be up-to-date, as this 
is not a rebuild, and
+          // we apply the user parameters 
(HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) instead.
+          materializations = Hive.get().getAllValidMaterializedViews(false, 
txnWriteIds);
+        }
         // We need to use the current cluster for the scan operator on views,
         // otherwise the planner will throw an Exception (different planners)
         materializations = Lists.transform(materializations,
@@ -1920,6 +2156,29 @@ public class CalcitePlanner extends SemanticAnalyzer {
         
RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(mdProvider));
         perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, 
"Calcite: View-based rewriting");
         if (calcitePreMVRewritingPlan != basePlan) {
+          // A rewriting was produced, we will check whether it was part of an 
incremental rebuild
+          // to try to replace INSERT OVERWRITE by INSERT
+          if (mvRebuildMode == 
MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD &&
+              HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_INCREMENTAL) &&
+              HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL)) {
+            // First we need to check if it is valid to convert to 
MERGE/INSERT INTO.
+            // If we succeed, we modify the plan and afterwards the AST.
+            // MV should be an acid table.
+            MaterializedViewRewritingRelVisitor visitor = new 
MaterializedViewRewritingRelVisitor();
+            visitor.go(basePlan);
+            if (visitor.isRewritingAllowed()) {
+              // Trigger rewriting to remove UNION branch with MV
+              if (visitor.isContainsAggregate()) {
+                basePlan = hepPlan(basePlan, false, mdProvider, null,
+                    HepMatchOrder.TOP_DOWN, 
HiveAggregateIncrementalRewritingRule.INSTANCE);
+                mvRebuildMode = MaterializationRebuildMode.AGGREGATE_REBUILD;
+              } else {
+                basePlan = hepPlan(basePlan, false, mdProvider, null,
+                    HepMatchOrder.TOP_DOWN, 
HiveNoAggregateIncrementalRewritingRule.INSTANCE);
+                mvRebuildMode = 
MaterializationRebuildMode.NO_AGGREGATE_REBUILD;
+              }
+            }
+          }
           // Now we trigger some needed optimization rules again
           basePlan = applyPreJoinOrderingTransforms(basePlan, mdProvider, 
executorProvider);
         }

Reply via email to