imay commented on a change in pull request #1695: Refactor alter job
URL: https://github.com/apache/incubator-doris/pull/1695#discussion_r319336556
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
 ##########
 @@ -124,67 +126,85 @@ public void init(Analyzer analyzer) throws UserException 
{
         // columns: k1, k2, v1, v2=k1 + k2
         // this means that there are three columns(k1, k2, v1) in source file,
         // and v2 is derived from (k1 + k2)
-        if (streamLoadTask.getColumnExprDesc() != null && 
!streamLoadTask.getColumnExprDesc().isEmpty()) {
-            for (ImportColumnDesc importColumnDesc : 
streamLoadTask.getColumnExprDesc()) {
-                // make column name case match with real column name
-                String columnName = importColumnDesc.getColumnName();
-                String realColName = dstTable.getColumn(columnName) == null ? 
columnName
-                        : dstTable.getColumn(columnName).getName();
-                if (importColumnDesc.getExpr() != null) {
-                    exprsByName.put(realColName, importColumnDesc.getExpr());
-                } else {
-                    SlotDescriptor slotDesc = 
analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
-                    
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
-                    slotDesc.setIsMaterialized(true);
-                    // ISSUE A: src slot should be nullable even if the column 
is not nullable.
-                    // because src slot is what we read from file, not 
represent to real column value.
-                    // If column is not nullable, error will be thrown when 
filling the dest slot,
-                    // which is not nullable
-                    slotDesc.setIsNullable(true);
-                    params.addToSrc_slot_ids(slotDesc.getId().asInt());
-                    slotDescByName.put(realColName, slotDesc);
-                }
-            }
-
-            // analyze all exprs
-            for (Map.Entry<String, Expr> entry : exprsByName.entrySet()) {
-                ExprSubstitutionMap smap = new ExprSubstitutionMap();
-                List<SlotRef> slots = Lists.newArrayList();
-                entry.getValue().collect(SlotRef.class, slots);
-                for (SlotRef slot : slots) {
-                    SlotDescriptor slotDesc = 
slotDescByName.get(slot.getColumnName());
-                    if (slotDesc == null) {
-                        throw new UserException("unknown reference column, 
column=" + entry.getKey()
-                                + ", reference=" + slot.getColumnName());
-                    }
-                    smap.getLhs().add(slot);
-                    smap.getRhs().add(new SlotRef(slotDesc));
-                }
-                Expr expr = entry.getValue().clone(smap);
-                expr.analyze(analyzer);
 
-                // check if contain aggregation
-                List<FunctionCallExpr> funcs = Lists.newArrayList();
-                expr.collect(FunctionCallExpr.class, funcs);
-                for (FunctionCallExpr fn : funcs) {
-                    if (fn.isAggregateFunction()) {
-                        throw new AnalysisException("Don't support aggregation 
function in load expression");
-                    }
-                }
+        // If user does not specify the column expr descs, generate it by 
using base schema of table.
+        // So that the following process can be unified
+        if (streamLoadTask.getColumnExprDescs() == null || 
streamLoadTask.getColumnExprDescs().isEmpty()) {
+            List<Column> columns = dstTable.getBaseSchema();
+            for (Column column : columns) {
+                ImportColumnDesc columnDesc = new 
ImportColumnDesc(column.getName());
+                LOG.debug("add base column {} to stream load task", 
column.getName());
+                streamLoadTask.addColumnExprDesc(columnDesc);
+            }
+        }
 
-                exprsByName.put(entry.getKey(), expr);
+        // When doing schema change, there may have some 'shadow' columns, 
with prefix '__doris_shadow_' in
+        // their names. These columns are visible to user, but we need to 
generate data for these columns.
+        // So we add column mappings for these column.
+        // eg:
+        // base schema is (A, B, C), and B is under schema change, so there 
will be a shadow column: '__doris_shadow_B'
+        // So the final column mapping should looks like: (A, B, C, 
__doris_shadow_B = B);
+        List<Column> fullSchema = dstTable.getFullSchema();
 
 Review comment:
   There may be problem when input column mapping is (tmp_a, tmp_b, tmp_c, a = 
tmp_a, b = tmp_b, c = tmp_c). It's better to add shadow column after all exprs 
are analyzed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to