[ 
https://issues.apache.org/jira/browse/TAJO-1832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14804793#comment-14804793
 ] 

ASF GitHub Bot commented on TAJO-1832:
--------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/756#discussion_r39816462
  
    --- Diff: 
tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
 ---
    @@ -0,0 +1,479 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.plan.rewrite;
    +
    +import com.google.common.base.Objects;
    +import org.apache.tajo.SessionVars;
    +import org.apache.tajo.algebra.*;
    +import org.apache.tajo.catalog.*;
    +import org.apache.tajo.common.TajoDataTypes.Type;
    +import org.apache.tajo.exception.TajoException;
    +import org.apache.tajo.exception.TajoInternalError;
    +import org.apache.tajo.exception.UnsupportedException;
    +import org.apache.tajo.plan.ExprAnnotator;
    +import org.apache.tajo.plan.LogicalPlan;
    +import org.apache.tajo.plan.LogicalPlan.QueryBlock;
    +import org.apache.tajo.plan.LogicalPlanner.PlanContext;
    +import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
    +import org.apache.tajo.plan.logical.*;
    +import org.apache.tajo.plan.nameresolver.NameResolver;
    +import org.apache.tajo.plan.nameresolver.NameResolvingMode;
    +import 
org.apache.tajo.plan.rewrite.BaseSchemaBuildPhase.Processor.NameRefInSelectListNormalizer;
    +import org.apache.tajo.plan.util.ExprFinder;
    +import org.apache.tajo.plan.util.PlannerUtil;
    +import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
    +import org.apache.tajo.util.StringUtils;
    +import org.apache.tajo.util.TUtil;
    +import org.apache.tajo.util.graph.DirectedGraphVisitor;
    +import org.apache.tajo.util.graph.SimpleDirectedGraph;
    +
    +import java.util.*;
    +
    +/**
    + * SelfDescSchemaBuildPhase builds the schema information of tables of 
self-describing data formats,
    + * such as JSON, Parquet, and ORC.
    + */
    +public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
    +
    +  private Processor processor;
    +
    +  public SelfDescSchemaBuildPhase(CatalogService catalog, ExprAnnotator 
annotator) {
    +    super(catalog, annotator);
    +  }
    +
    +  @Override
    +  public String getName() {
    +    return "Self-describing schema build phase";
    +  }
    +
    +  private static String getQualifiedRelationName(PlanContext context, 
Relation relation) {
    +    return CatalogUtil.isFQTableName(relation.getName()) ?
    +        relation.getName() :
    +        
CatalogUtil.buildFQName(context.getQueryContext().get(SessionVars.CURRENT_DATABASE),
 relation.getName());
    +  }
    +
    +  @Override
    +  public boolean isEligible(PlanContext context, Expr expr) throws 
TajoException {
    +    Set<Relation> relations = ExprFinderIncludeSubquery.finds(expr, 
OpType.Relation);
    +    for (Relation eachRelation : relations) {
    +      if (catalog.getTableDesc(getQualifiedRelationName(context, 
eachRelation)).hasSelfDescSchema()) {
    +        return true;
    +      }
    +    }
    +    return false;
    +  }
    +
    +  static class FinderContext<T> {
    +    Set<T> set = new HashSet<>();
    +    OpType targetType;
    +
    +    FinderContext(OpType type) {
    +      this.targetType = type;
    +    }
    +  }
    +
    +  private static class ExprFinderIncludeSubquery extends 
SimpleAlgebraVisitor<FinderContext, Object> {
    +
    +    public static <T extends Expr> Set<T> finds(Expr expr, OpType type) 
throws TajoException {
    +      FinderContext<T> context = new FinderContext<>(type);
    +      ExprFinderIncludeSubquery finder = new ExprFinderIncludeSubquery();
    +      finder.visit(context, new Stack<Expr>(), expr);
    +      return context.set;
    +    }
    +
    +    @Override
    +    public Object visit(FinderContext ctx, Stack<Expr> stack, Expr expr) 
throws TajoException {
    +      if (expr instanceof Selection) {
    +        preHook(ctx, stack, expr);
    +        visit(ctx, stack, ((Selection) expr).getQual());
    +        visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof UnaryOperator) {
    +        preHook(ctx, stack, expr);
    +        visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof BinaryOperator) {
    +        preHook(ctx, stack, expr);
    +        visitBinaryOperator(ctx, stack, (BinaryOperator) expr);
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof SimpleTableSubquery) {
    +        preHook(ctx, stack, expr);
    +        visit(ctx, stack, ((SimpleTableSubquery) expr).getSubQuery());
    +        postHook(ctx, stack, expr, null);
    +      } else if (expr instanceof TablePrimarySubQuery) {
    +        preHook(ctx, stack, expr);
    +        visit(ctx, stack, ((TablePrimarySubQuery) expr).getSubQuery());
    +        postHook(ctx, stack, expr, null);
    +      } else {
    +        super.visit(ctx, stack, expr);
    +      }
    +
    +      if (expr != null && ctx.targetType == expr.getType()) {
    +        ctx.set.add(expr);
    +      }
    +
    +      return null;
    +    }
    +  }
    +
    +  @Override
    +  public LogicalNode process(PlanContext context, Expr expr) throws 
TajoException {
    +    if (processor == null) {
    +      processor = new Processor();
    +    }
    +    return processor.visit(new ProcessorContext(context), new 
Stack<Expr>(), expr);
    +  }
    +
    +  static class ProcessorContext {
    +    final PlanContext planContext;
    +    final Map<String, List<ColumnReferenceExpr>> projectColumns = new 
HashMap<>();
    +
    +    public ProcessorContext(PlanContext planContext) {
    +      this.planContext = planContext;
    +    }
    +  }
    +
    +  static class Processor extends BaseAlgebraVisitor<ProcessorContext, 
LogicalNode> {
    +
    +    private static <T extends LogicalNode> T getNodeFromExpr(LogicalPlan 
plan, Expr expr) {
    +      return plan.getBlockByExpr(expr).getNodeFromExpr(expr);
    +    }
    +
    +    private static <T extends LogicalNode> T 
getNonRelationListExpr(LogicalPlan plan, Expr expr) {
    +      if (expr instanceof RelationList) {
    +        return getNodeFromExpr(plan, ((RelationList) 
expr).getRelations()[0]);
    +      } else {
    +        return getNodeFromExpr(plan, expr);
    +      }
    +    }
    +
    +    @Override
    +    public LogicalNode visitProjection(ProcessorContext ctx, Stack<Expr> 
stack, Projection expr) throws TajoException {
    +      if (PlannerUtil.hasAsterisk(expr.getNamedExprs())) {
    +        throw new UnsupportedException("Asterisk for self-describing data 
formats");
    +      }
    +
    +      for (NamedExpr eachNamedExpr : expr.getNamedExprs()) {
    +        Set<ColumnReferenceExpr> columns = ExprFinder.finds(eachNamedExpr, 
OpType.Column);
    +        for (ColumnReferenceExpr col : columns) {
    +          TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(), 
col);
    +        }
    +      }
    +
    +      super.visitProjection(ctx, stack, expr);
    +
    +      ProjectionNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitLimit(ProcessorContext ctx, Stack<Expr> stack, 
Limit expr) throws TajoException {
    +      super.visitLimit(ctx, stack, expr);
    +
    +      LimitNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitSort(ProcessorContext ctx, Stack<Expr> stack, 
Sort expr) throws TajoException {
    +      super.visitSort(ctx, stack, expr);
    +
    +      SortNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitHaving(ProcessorContext ctx, Stack<Expr> 
stack, Having expr) throws TajoException {
    +      super.visitHaving(ctx, stack, expr);
    +
    +      HavingNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitGroupBy(ProcessorContext ctx, Stack<Expr> 
stack, Aggregation expr) throws TajoException {
    +      super.visitGroupBy(ctx, stack, expr);
    +
    +      GroupbyNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitJoin(ProcessorContext ctx, Stack<Expr> stack, 
Join expr) throws TajoException {
    +      super.visitJoin(ctx, stack, expr);
    +
    +      JoinNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode leftChild = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      LogicalNode rightChild = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getRight());
    +      node.setInSchema(SchemaUtil.merge(leftChild.getOutSchema(), 
rightChild.getOutSchema()));
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitFilter(ProcessorContext ctx, Stack<Expr> 
stack, Selection expr) throws TajoException {
    +      Set<ColumnReferenceExpr> columnSet = 
ExprFinder.finds(expr.getQual(), OpType.Column);
    +      for (ColumnReferenceExpr col : columnSet) {
    +        NameRefInSelectListNormalizer.normalize(ctx.planContext, col);
    +        TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(), col);
    +      }
    +
    +      super.visitFilter(ctx, stack, expr);
    +
    +      SelectionNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitUnion(ProcessorContext ctx, Stack<Expr> stack, 
SetOperation expr) throws TajoException {
    +      super.visitUnion(ctx, stack, expr);
    +
    +      UnionNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitExcept(ProcessorContext ctx, Stack<Expr> 
stack, SetOperation expr) throws TajoException {
    +      super.visitExcept(ctx, stack, expr);
    +
    +      ExceptNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitIntersect(ProcessorContext ctx, Stack<Expr> 
stack, SetOperation expr) throws TajoException {
    +      super.visitIntersect(ctx, stack, expr);
    +
    +      IntersectNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitSimpleTableSubquery(ProcessorContext ctx, 
Stack<Expr> stack, SimpleTableSubquery expr)
    +        throws TajoException {
    +      super.visitSimpleTableSubquery(ctx, stack, expr);
    +
    +      TableSubQueryNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitTableSubQuery(ProcessorContext ctx, 
Stack<Expr> stack, TablePrimarySubQuery expr)
    +        throws TajoException {
    +      super.visitTableSubQuery(ctx, stack, expr);
    +
    +      TableSubQueryNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitCreateTable(ProcessorContext ctx, Stack<Expr> 
stack, CreateTable expr) throws TajoException {
    +      super.visitCreateTable(ctx, stack, expr);
    +      CreateTableNode node = getNodeFromExpr(ctx.planContext.getPlan(), 
expr);
    +
    +      if (expr.hasSubQuery()) {
    +        LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +        node.setInSchema(child.getOutSchema());
    +        node.setOutSchema(node.getInSchema());
    +      }
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitInsert(ProcessorContext ctx, Stack<Expr> 
stack, Insert expr) throws TajoException {
    +      super.visitInsert(ctx, stack, expr);
    +
    +      InsertNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
    +      LogicalNode child = 
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
    +      node.setInSchema(child.getOutSchema());
    +      node.setOutSchema(node.getInSchema());
    +      return node;
    +    }
    +
    +    @Override
    +    public LogicalNode visitRelation(ProcessorContext ctx, Stack<Expr> 
stack, Relation expr) throws TajoException {
    +      LogicalPlan plan = ctx.planContext.getPlan();
    +      QueryBlock queryBlock = plan.getBlockByExpr(expr);
    +      ScanNode scan = queryBlock.getNodeFromExpr(expr);
    +      TableDesc desc = scan.getTableDesc();
    +
    +      if (desc.hasSelfDescSchema()) {
    +        if 
(ctx.projectColumns.containsKey(getQualifiedRelationName(ctx.planContext, 
expr))) {
    +          Set<Column> columns = new HashSet<>();
    +          for (ColumnReferenceExpr col : 
ctx.projectColumns.get(getQualifiedRelationName(ctx.planContext, expr))) {
    +            columns.add(NameResolver.resolve(plan, queryBlock, col, 
NameResolvingMode.RELS_ONLY, true));
    +          }
    +
    +          desc.setSchema(buildSchemaFromColumnSet(columns));
    +          scan.init(desc);
    +        } else {
    +          // error
    +          throw new TajoInternalError(
    +              "Columns projected from " + 
getQualifiedRelationName(ctx.planContext, expr) + " is not found.");
    +        }
    +      }
    +
    +      return scan;
    +    }
    +
    +    /**
    +     * This method creates a schema from a set of columns.
    +     * For a nested column, its ancestors are guessed and added to the 
schema.
    +     * For example, given a column 'glossary.title', the columns of 
(glossary RECORD (title TEXT)) will be added
    +     * to the schema.
    +     *
    +     * @param columns a set of columns
    +     * @return schema build from columns
    +     */
    +    private Schema buildSchemaFromColumnSet(Set<Column> columns) {
    +      SchemaGraph schemaGraph = new SchemaGraph();
    +      Set<ColumnVertex> rootVertexes = new HashSet<>();
    +      Schema schema = new Schema();
    +
    +      for (Column eachColumn : columns) {
    +        String simpleName = eachColumn.getSimpleName();
    +        if (NestedPathUtil.isPath(simpleName)) {
    +          String[] paths = simpleName.split(NestedPathUtil.PATH_DELIMITER);
    +          for (int i = 0; i < paths.length-1; i++) {
    +            String parentName = paths[i];
    +            if (i == 0) {
    +              parentName = 
CatalogUtil.buildFQName(eachColumn.getQualifier(), parentName);
    +            }
    +            // Leaf column type is TEXT; otherwise, RECORD.
    +            Type childDataType = (i == paths.length-2) ? Type.TEXT : 
Type.RECORD;
    --- End diff --
    
    These lines exceed 120 column width. In addition, ``visitCreateTable()`` in 
this file exceeds 120 too.


> Well support for self-describing data formats
> ---------------------------------------------
>
>                 Key: TAJO-1832
>                 URL: https://issues.apache.org/jira/browse/TAJO-1832
>             Project: Tajo
>          Issue Type: New Feature
>          Components: Planner/Optimizer
>            Reporter: Jihoon Son
>            Assignee: Jihoon Son
>
> *Problem*
> Tajo already has a support for self-describing data formats like JSON, 
> Parquet, or ORC. While they are capable of providing schema information by 
> themselves, users must define schema to query on them with the current 
> implementation. To solve this inconvenience, we have to improve our query 
> planner to support self-describing data formats well. 
> *Solution*
> First, we need to allow omitting schema definition for the create table 
> statement. When a query is submitted for a self-describing table, the columns 
> which don't exist in that table will be filled with Nulls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to