morningman commented on a change in pull request #2431: Support to create 
materialized view
URL: https://github.com/apache/incubator-doris/pull/2431#discussion_r357501996
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
 ##########
 @@ -67,59 +72,241 @@
 import java.util.Set;
 
 /*
- * RollupHandler is responsible for ADD/DROP rollup.
+ * MaterializedViewHandler is responsible for ADD/DROP materialized view.
+ * For compatible with older version, it is also responsible for ADD/DROP 
rollup.
+ * In function level, the mv completely covers the rollup in the future.
+ * In grammar level, there is some difference between mv and rollup.
  */
-public class RollupHandler extends AlterHandler {
-    private static final Logger LOG = 
LogManager.getLogger(RollupHandler.class);
+public class MaterializedViewHandler extends AlterHandler {
+    private static final Logger LOG = 
LogManager.getLogger(MaterializedViewHandler.class);
 
-    public RollupHandler() {
-        super("rollup");
+    public MaterializedViewHandler() {
+        super("materialized view");
     }
 
-    /*
-     * Handle the Add Rollup request.
-     * 3 main steps:
-     * 1. Validate the request.
-     * 2. Create RollupJob with rollup index
-     *      All replicas of the rollup index will be created in meta and added 
to TabletInvertedIndex
-     * 3. Set table's state to ROLLUP.
+    /**
+     * There are 2 main steps in this function.
+     * Step1: validate the request.
+     *   Step1.1: semantic analysis: the name of olapTable must be same as the 
base table name in addMVClause.
+     *   Step1.2: base table validation: the status of base table and 
partition could be NORMAL.
+     *   Step1.3: materialized view validation: the name and columns of mv is 
checked.
+     * Step2: create mv job
+     * @param addMVClause
+     * @param db
+     * @param olapTable
+     * @throws DdlException
      */
-    private void processAddRollup(AddRollupClause alterClause, Database db, 
OlapTable olapTable)
-            throws DdlException {
-        
-        // table is under rollup or has a finishing alter job
-        if (olapTable.getState() == OlapTableState.ROLLUP || 
this.hasUnfinishedAlterJob(olapTable.getId())) {
-            throw new DdlException("Table[" + olapTable.getName() + "]'s is 
under ROLLUP");
+    private void processAddMaterializedView(AddMaterializedViewClause 
addMVClause, Database db, OlapTable olapTable)
+            throws DdlException, AnalysisException {
+        // Step1.1: semantic analysis
+        // TODO(ML): support the materialized view as base index
+        if (!addMVClause.getBaseIndexName().equals(olapTable.getName())) {
+            throw new DdlException("The name of table in from clause must be 
same as the name of alter table");
         }
-        // up to here, table's state can only be NORMAL
-        Preconditions.checkState(olapTable.getState() == 
OlapTableState.NORMAL, olapTable.getState().name());
+        // Step1.2: base table validation
+        String baseIndexName = addMVClause.getBaseIndexName();
+        String mvIndexName = addMVClause.getMVName();
+        LOG.info("process add materialized view[{}] based on [{}]", 
mvIndexName, baseIndexName);
+        long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
+        // Step1.3: mv clause validation
+        List<Column> mvColumns = checkAndPrepareMaterializedView(addMVClause, 
olapTable);
+
+        // Step2: create mv job
+        createMaterializedViewJob(mvIndexName, baseIndexName, mvColumns,
+                                  addMVClause.getProperties(), olapTable, db, 
baseIndexId);
+    }
 
-        String rollupIndexName = alterClause.getRollupName();
+    /**
+     * There are 2 main steps.
+     * Step1: validate the request
+     *   Step1.1: base table validation: the status of base table and 
partition could be NORMAL.
+     *   Step1.2: rollup validation: the name and columns of rollup is checked.
+     * Step2: create rollup job
+     * @param alterClause
+     * @param db
+     * @param olapTable
+     * @throws DdlException
+     * @throws AnalysisException
+     */
+    private void processAddRollup(AddRollupClause alterClause, Database db, 
OlapTable olapTable)
+            throws DdlException, AnalysisException {
         String baseIndexName = alterClause.getBaseRollupName();
-        List<String> rollupColumnNames = alterClause.getColumnNames();
-
-        LOG.info("process add rollup[{}] based on [{}]", rollupIndexName, 
baseIndexName);
-
-        // 1. check if rollup index already exists
-        if (olapTable.hasMaterializedIndex(rollupIndexName)) {
-            throw new DdlException("Rollup index[" + rollupIndexName + "] 
already exists");
-        }
-
-        // 2. get base index schema
+        String rollupIndexName = alterClause.getRollupName();
+        // get base index schema
         if (baseIndexName == null) {
             // use table name as base table name
             baseIndexName = olapTable.getName();
         }
-        Long baseIndexId = olapTable.getIndexIdByName(baseIndexName);
-        if (baseIndexId == null) {
-            throw new DdlException("Base index[" + baseIndexName + "] does not 
exist");
-        }
+        // Step1.1 check base table and base index
+        // Step1.2 alter clause validation
+        LOG.info("process add rollup[{}] based on [{}]", rollupIndexName, 
baseIndexName);
+        Long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
+        List<Column> rollupSchema = 
checkAndPrepareMaterializedView(alterClause, olapTable, baseIndexId);
 
-        // check state
+        // Step2: create materialized view job
+        createMaterializedViewJob(rollupIndexName, baseIndexName, 
rollupSchema, alterClause.getProperties(),
+                                  olapTable, db, baseIndexId);
+    }
+
+    /**
+     * Step1: All replicas of the materialized view index will be created in 
meta and added to TabletInvertedIndex
+     * Step2: Set table's state to ROLLUP.
+     *
+     * @param mvName
+     * @param baseIndexName
+     * @param mvColumns
+     * @param properties
+     * @param olapTable
+     * @param db
+     * @param baseIndexId
+     * @throws DdlException
+     * @throws AnalysisException
+     */
+    private void createMaterializedViewJob(String mvName, String baseIndexName,
+                                           List<Column> mvColumns, Map<String, 
String> properties,
+                                           OlapTable olapTable, Database db, 
long baseIndexId)
+            throws DdlException, AnalysisException {
+        // assign rollup index's key type, same as base index's
+        KeysType mvKeysType = olapTable.getKeysType();
+        // get rollup schema hash
+        int mvSchemaHash = Util.schemaHash(0 /* init schema version */, 
mvColumns, olapTable.getCopiedBfColumns(),
+                                           olapTable.getBfFpp());
+        // get short key column count
+        short mvShortKeyColumnCount = 
Catalog.calcShortKeyColumnCount(mvColumns, properties);
+        // get timeout
+        long timeoutMs = PropertyAnalyzer.analyzeTimeout(properties, 
Config.alter_table_timeout_second) * 1000;
+
+        // create rollup job
+        long dbId = db.getId();
+        long tableId = olapTable.getId();
+        int baseSchemaHash = olapTable.getSchemaHashByIndexId(baseIndexId);
+        Catalog catalog = Catalog.getCurrentCatalog();
+        long jobId = catalog.getNextId();
+        long mvIndexId = catalog.getNextId();
+        RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, 
olapTable.getName(), timeoutMs,
+                                            baseIndexId, mvIndexId, 
baseIndexName, mvName,
+                                            mvColumns, baseSchemaHash, 
mvSchemaHash,
+                                            mvKeysType, mvShortKeyColumnCount);
+
+        /*
+         * create all rollup indexes. and set state.
+         * After setting, Tables' state will be ROLLUP
+         */
         for (Partition partition : olapTable.getPartitions()) {
+            long partitionId = partition.getId();
+            TStorageMedium medium = 
olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
+            // index state is SHADOW
+            MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, 
IndexState.SHADOW);
             MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
-            // up to here. index's state should only be NORMAL
-            Preconditions.checkState(baseIndex.getState() == 
IndexState.NORMAL, baseIndex.getState().name());
+            TabletMeta mvTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, mvIndexId, mvSchemaHash,
+                                                     medium);
+            for (Tablet baseTablet : baseIndex.getTablets()) {
+                long baseTabletId = baseTablet.getId();
+                long mvTabletId = catalog.getNextId();
+
+                Tablet newTablet = new Tablet(mvTabletId);
+                mvIndex.addTablet(newTablet, mvTabletMeta);
+
+                mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId);
+                List<Replica> baseReplicas = baseTablet.getReplicas();
+
+                for (Replica baseReplica : baseReplicas) {
+                    long mvReplicaId = catalog.getNextId();
+                    long backendId = baseReplica.getBackendId();
+                    if (baseReplica.getState() == ReplicaState.CLONE
+                            || baseReplica.getState() == 
ReplicaState.DECOMMISSION
+                            || baseReplica.getLastFailedVersion() > 0) {
+                        // just skip it.
+                        continue;
+                    }
+                    Preconditions.checkState(baseReplica.getState() == 
ReplicaState.NORMAL);
+                    // replica's init state is ALTER, so that tablet report 
process will ignore its report
+                    Replica mvReplica = new Replica(mvReplicaId, backendId, 
ReplicaState.ALTER,
+                                                    
Partition.PARTITION_INIT_VERSION, Partition
+                                                            
.PARTITION_INIT_VERSION_HASH,
+                                                    mvSchemaHash);
+                    newTablet.addReplica(mvReplica);
+                } // end for baseReplica
+            } // end for baseTablets
+
+            mvJob.addMVIndex(partitionId, mvIndex);
+
+            LOG.debug("create materialized view index {} based on index {} in 
partition {}",
+                      mvIndexId, baseIndexId, partitionId);
+        } // end for partitions
+
+        // update table state
+        olapTable.setState(OlapTableState.ROLLUP);
+
+        addAlterJobV2(mvJob);
+
+        // log rollup operation
+        catalog.getEditLog().logAlterJob(mvJob);
+        LOG.info("finished to create materialized view job: {}", 
mvJob.getJobId());
+    }
+
+    private List<Column> 
checkAndPrepareMaterializedView(AddMaterializedViewClause addMVClause, 
OlapTable olapTable)
+            throws DdlException {
+        // check if mv index already exists
+        if (olapTable.hasMaterializedIndex(addMVClause.getMVName())) {
+            throw new DdlException("Materialized view[" + 
addMVClause.getMVName() + "] already exists");
+        }
+        // check if rollup columns are valid
+        // a. all columns should exist in base rollup schema
+        // b. For aggregate table, mv columns with aggregate function should 
be same as base schema
+        // c. For aggregate table, the column which is the key of base table 
should be the key of mv as well.
+        // update mv columns
+        List<MVColumnItem> mvColumnItemList = 
addMVClause.getMVColumnItemList();
+        List<Column> newMVColumns = Lists.newArrayList();
+        int numOfKeys = 0;
+        for (MVColumnItem mvColumnItem : mvColumnItemList) {
+            String mvColumnName = mvColumnItem.getName();
+            Column baseColumn = olapTable.getColumn(mvColumnName);
+            if (baseColumn == null) {
+                throw new DdlException("Column[" + mvColumnName + "] does not 
exist");
+            }
+            if (mvColumnItem.isKey()) {
+                ++numOfKeys;
+            }
+            AggregateType baseAggregationType = 
baseColumn.getAggregationType();
+            AggregateType mvAggregationType = 
mvColumnItem.getAggregationType();
+            if (olapTable.getKeysType() == KeysType.AGG_KEYS || 
olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
+                if (baseColumn.isKey() && !mvColumnItem.isKey()) {
+                    throw new DdlException("The column[" + mvColumnName + "] 
must be the key of materialized view");
+                }
+                if (baseAggregationType != mvAggregationType) {
+                    throw new DdlException("The aggregation type of column[" + 
mvColumnName + "] must be same as "
+                                                   + "the aggregate type of 
base column in aggregate table");
+                }
+                if ((baseAggregationType == AggregateType.REPLACE
+                        || baseAggregationType == 
AggregateType.REPLACE_IF_NOT_NULL)
+                        && olapTable.getKeysNum() != numOfKeys) {
+                    throw new DdlException("The materialized view should 
contain all keys of base table if there is a"
+                                                   + " REPLACE value");
+                }
+            }
+            if (olapTable.getKeysType() == KeysType.DUP_KEYS
+                    && (mvAggregationType == AggregateType.REPLACE
+                    || mvAggregationType == 
AggregateType.REPLACE_IF_NOT_NULL)) {
 
 Review comment:
   Better to unify the check of REPLACE and REPLACE_IF_NOT_NULL, since they 
always be checked together.
   eg: 
    `mvAggregationType.isReplaceFamily()`
   
   Same to the `KeysType`, which AGG_KEYS and UNIQUE_KEYS are always together.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to