Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/756#discussion_r39816462
  
    --- Diff: 
tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
 ---
    @@ -0,0 +1,479 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.plan.rewrite;
    +
    +import com.google.common.base.Objects;
    +import org.apache.tajo.SessionVars;
    +import org.apache.tajo.algebra.*;
    +import org.apache.tajo.catalog.*;
    +import org.apache.tajo.common.TajoDataTypes.Type;
    +import org.apache.tajo.exception.TajoException;
    +import org.apache.tajo.exception.TajoInternalError;
    +import org.apache.tajo.exception.UnsupportedException;
    +import org.apache.tajo.plan.ExprAnnotator;
    +import org.apache.tajo.plan.LogicalPlan;
    +import org.apache.tajo.plan.LogicalPlan.QueryBlock;
    +import org.apache.tajo.plan.LogicalPlanner.PlanContext;
    +import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
    +import org.apache.tajo.plan.logical.*;
    +import org.apache.tajo.plan.nameresolver.NameResolver;
    +import org.apache.tajo.plan.nameresolver.NameResolvingMode;
    +import 
org.apache.tajo.plan.rewrite.BaseSchemaBuildPhase.Processor.NameRefInSelectListNormalizer;
    +import org.apache.tajo.plan.util.ExprFinder;
    +import org.apache.tajo.plan.util.PlannerUtil;
    +import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
    +import org.apache.tajo.util.StringUtils;
    +import org.apache.tajo.util.TUtil;
    +import org.apache.tajo.util.graph.DirectedGraphVisitor;
    +import org.apache.tajo.util.graph.SimpleDirectedGraph;
    +
    +import java.util.*;
    +
    +/**
    + * SelfDescSchemaBuildPhase builds the schema information of tables of 
self-describing data formats,
    + * such as JSON, Parquet, and ORC.
    + */
    +public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
    +
    +  private Processor processor;
    +
    +  public SelfDescSchemaBuildPhase(CatalogService catalog, ExprAnnotator 
annotator) {
    +    super(catalog, annotator);
    +  }
    +
    +  @Override
    +  public String getName() {
    +    return "Self-describing schema build phase";
    +  }
    +
    +  private static String getQualifiedRelationName(PlanContext context, 
Relation relation) {
    +    return CatalogUtil.isFQTableName(relation.getName()) ?
    +        relation.getName() :
    +        
CatalogUtil.buildFQName(context.getQueryContext().get(SessionVars.CURRENT_DATABASE),
 relation.getName());
    +  }
    +
    +  @Override
    +  public boolean isEligible(PlanContext context, Expr expr) throws 
TajoException {
    +    Set<Relation> relations = ExprFinderIncludeSubquery.finds(expr, 
OpType.Relation);
    +    for (Relation eachRelation : relations) {
    +      if (catalog.getTableDesc(getQualifiedRelationName(context, 
eachRelation)).hasSelfDescSchema()) {
    +        return true;
    +      }
    +    }
    +    return false;
    +  }
    +
    +  static class FinderContext<T> {
    +    Set<T> set = new HashSet<>();
    +    OpType targetType;
    +
    +    FinderContext(OpType type) {
    +      this.targetType = type;
    +    }
    +  }
    +
    +  private static class ExprFinderIncludeSubquery extends 
SimpleAlgebraVisitor<FinderContext, Object> {
    +
    +    public static <T extends Expr> Set<T> finds(Expr expr, OpType type) 
throws TajoException {
    +      FinderContext<T> context = new FinderContext<>(type);
    +      ExprFinderIncludeSubquery finder = new ExprFinderIncludeSubquery();
    +      finder.visit(context, new Stack<Expr>(), expr);
    +      return context.set;
    +    }
    +
    +    @Override
    +    public Object visit(FinderContext ctx, Stack<Expr> stack, Expr expr) 
throws TajoException {
    +      if (expr instanceof Selection) {
    +        preHook(ctx, stack, expr);
    +        visit(ctx, stack, ((Selection) expr).getQual());
    +        visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof UnaryOperator) {
    +        preHook(ctx, stack, expr);
    +        visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof BinaryOperator) {
    +        preHook(ctx, stack, expr);
    +        visitBinaryOperator(ctx, stack, (BinaryOperator) expr);
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof SimpleTableSubquery) {
    +        preHook(ctx, stack, expr);
    +        visit(ctx, stack, ((SimpleTableSubquery) expr).getSubQuery());
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof TablePrimarySubQuery) {
    +        preHook(ctx, stack, expr);
    +        visit(ctx, stack, ((TablePrimarySubQuery) expr).getSubQuery());
    +        postHook(ctx, stack, expr, null);
    +      } else {
    +        super.visit(ctx, stack, expr);
    +      }
    +
    +      if (expr != null && ctx.targetType == expr.getType()) {
    +        ctx.set.add(expr);
    +      }
    +
    +      return null;
    +    }
    +  }
    +
    +  @Override
    +  public LogicalNode process(PlanContext context, Expr expr) throws 
TajoException {
    +    if (processor == null) {
    +      processor = new Processor();
    +    }
    +    return processor.visit(new ProcessorContext(context), new 
Stack<Expr>(), expr);
    +  }
    +
    +  static class ProcessorContext {
    +    final PlanContext planContext;
    +    final Map<String, List<ColumnReferenceExpr>> projectColumns = new 
HashMap<>();
    +
    +    public ProcessorContext(PlanContext planContext) {
    +      this.planContext = planContext;
    +    }
    +  }
    +
    +  static class Processor extends BaseAlgebraVisitor<ProcessorContext, 
LogicalNode> {
    +
    +    private static <T extends LogicalNode> T getNodeFromExpr(LogicalPlan 
plan, Expr expr) {
    +      return plan.getBlockByExpr(expr).getNodeFromExpr(expr);
    +    }
    +
    +    private static <T extends LogicalNode> T 
getNonRelationListExpr(LogicalPlan plan, Expr expr) {
    +      if (expr instanceof RelationList) {
    +        return getNodeFromExpr(plan, ((RelationList) 
expr).getRelations()[0]);
    +      } else {
    +        return getNodeFromExpr(plan, expr);
    +      }
    +    }
    +
    +    @Override
    +    public LogicalNode visitProjection(ProcessorContext ctx, Stack<Expr> 
stack, Projection expr) throws TajoException {
    +      if (PlannerUtil.hasAsterisk(expr.getNamedExprs())) {
    +        throw new UnsupportedException("Asterisk for self-describing data 
formats");
    +      }
    +
    +      for (NamedExpr eachNamedExpr : expr.getNamedExprs()) {
    +        Set<ColumnReferenceExpr> columns = ExprFinder.finds(eachNamedExpr, 
OpType.Column);
    +        for (ColumnReferenceExpr col : columns) {
    +          TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(), 
col);
    +        }
    +      }
    +
    +      super.visitProjection(ctx, stack, expr);
    +
    +      ProjectionNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitLimit(ProcessorContext ctx, Stack<Expr> stack, 
Limit expr) throws TajoException {
    +      super.visitLimit(ctx, stack, expr);
    +
    +      LimitNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitSort(ProcessorContext ctx, Stack<Expr> stack, 
Sort expr) throws TajoException {
    +      super.visitSort(ctx, stack, expr);
    +
    +      SortNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitHaving(ProcessorContext ctx, Stack<Expr> 
stack, Having expr) throws TajoException {
    +      super.visitHaving(ctx, stack, expr);
    +
    +      HavingNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitGroupBy(ProcessorContext ctx, Stack<Expr> 
stack, Aggregation expr) throws TajoException {
    +      super.visitGroupBy(ctx, stack, expr);
    +
    +      GroupbyNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitJoin(ProcessorContext ctx, Stack<Expr> stack, 
Join expr) throws TajoException {
    +      super.visitJoin(ctx, stack, expr);
    +
    +      JoinNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode leftChild = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      LogicalNode rightChild = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getRight());
    +      node.setInSchema(SchemaUtil.merge(leftChild.getOutSchema(), 
rightChild.getOutSchema()));
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitFilter(ProcessorContext ctx, Stack<Expr> 
stack, Selection expr) throws TajoException {
    +      Set<ColumnReferenceExpr> columnSet = 
ExprFinder.finds(expr.getQual(), OpType.Column);
    +      for (ColumnReferenceExpr col : columnSet) {
    +        NameRefInSelectListNormalizer.normalize(ctx.planContext, col);
    +        TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(), col);
    +      }
    +
    +      super.visitFilter(ctx, stack, expr);
    +
    +      SelectionNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitUnion(ProcessorContext ctx, Stack<Expr> stack, 
SetOperation expr) throws TajoException {
    +      super.visitUnion(ctx, stack, expr);
    +
    +      UnionNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitExcept(ProcessorContext ctx, Stack<Expr> 
stack, SetOperation expr) throws TajoException {
    +      super.visitExcept(ctx, stack, expr);
    +
    +      ExceptNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitIntersect(ProcessorContext ctx, Stack<Expr> 
stack, SetOperation expr) throws TajoException {
    +      super.visitIntersect(ctx, stack, expr);
    +
    +      IntersectNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitSimpleTableSubquery(ProcessorContext ctx, 
Stack<Expr> stack, SimpleTableSubquery expr)
    +        throws TajoException {
    +      super.visitSimpleTableSubquery(ctx, stack, expr);
    +
    +      TableSubQueryNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitTableSubQuery(ProcessorContext ctx, 
Stack<Expr> stack, TablePrimarySubQuery expr)
    +        throws TajoException {
    +      super.visitTableSubQuery(ctx, stack, expr);
    +
    +      TableSubQueryNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitCreateTable(ProcessorContext ctx, Stack<Expr> 
stack, CreateTable expr) throws TajoException {
    +      super.visitCreateTable(ctx, stack, expr);
    +      CreateTableNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +
    +      if (expr.hasSubQuery()) {
    +        LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +        node.setInSchema(child.getOutSchema());
    +        node.setOutSchema(node.getInSchema());
    +      }
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitInsert(ProcessorContext ctx, Stack<Expr> 
stack, Insert expr) throws TajoException {
    +      super.visitInsert(ctx, stack, expr);
    +
    +      InsertNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitRelation(ProcessorContext ctx, Stack<Expr> 
stack, Relation expr) throws TajoException {
    +      LogicalPlan plan = ctx.planContext.getPlan();
    +      QueryBlock queryBlock = plan.getBlockByExpr(expr);
    +      ScanNode scan = queryBlock.getNodeFromExpr(expr);
    +      TableDesc desc = scan.getTableDesc();
    +
    +      if (desc.hasSelfDescSchema()) {
    +        if 
(ctx.projectColumns.containsKey(getQualifiedRelationName(ctx.planContext, 
expr))) {
    +          Set<Column> columns = new HashSet<>();
    +          for (ColumnReferenceExpr col : 
ctx.projectColumns.get(getQualifiedRelationName(ctx.planContext, expr))) {
    +            columns.add(NameResolver.resolve(plan, queryBlock, col, 
NameResolvingMode.RELS_ONLY, true));
    +          }
    +
    +          desc.setSchema(buildSchemaFromColumnSet(columns));
    +          scan.init(desc);
    +        } else {
    +          // error
    +          throw new TajoInternalError(
    +              "Columns projected from " + 
getQualifiedRelationName(ctx.planContext, expr) + " is not found.");
    +        }
    +      }
    +
    +      return scan;
    +    }
    +
    +    /**
    +     * This method creates a schema from a set of columns.
    +     * For a nested column, its ancestors are guessed and added to the 
schema.
    +     * For example, given a column 'glossary.title', the columns of 
(glossary RECORD (title TEXT)) will be added
    +     * to the schema.
    +     *
    +     * @param columns a set of columns
    +     * @return schema build from columns
    +     */
    +    private Schema buildSchemaFromColumnSet(Set<Column> columns) {
    +      SchemaGraph schemaGraph = new SchemaGraph();
    +      Set<ColumnVertex> rootVertexes = new HashSet<>();
    +      Schema schema = new Schema();
    +
    +      for (Column eachColumn : columns) {
    +        String simpleName = eachColumn.getSimpleName();
    +        if (NestedPathUtil.isPath(simpleName)) {
    +          String[] paths = simpleName.split(NestedPathUtil.PATH_DELIMITER);
    +          for (int i = 0; i < paths.length-1; i++) {
    +            String parentName = paths[i];
    +            if (i == 0) {
    +              parentName = 
CatalogUtil.buildFQName(eachColumn.getQualifier(), parentName);
    +            }
    +            // Leaf column type is TEXT; otherwise, RECORD.
    +            Type childDataType = (i == paths.length-2) ? Type.TEXT : 
Type.RECORD;
    --- End diff --
    
    These lines exceed 120 column width. In addition, ``visitCreateTable()`` in 
this file exceeds 120 too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to