[
https://issues.apache.org/jira/browse/TAJO-1832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14804793#comment-14804793
]
ASF GitHub Bot commented on TAJO-1832:
--------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/756#discussion_r39816462
--- Diff:
tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
---
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.ExprAnnotator;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlan.QueryBlock;
+import org.apache.tajo.plan.LogicalPlanner.PlanContext;
+import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.nameresolver.NameResolver;
+import org.apache.tajo.plan.nameresolver.NameResolvingMode;
+import
org.apache.tajo.plan.rewrite.BaseSchemaBuildPhase.Processor.NameRefInSelectListNormalizer;
+import org.apache.tajo.plan.util.ExprFinder;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.graph.DirectedGraphVisitor;
+import org.apache.tajo.util.graph.SimpleDirectedGraph;
+
+import java.util.*;
+
+/**
+ * SelfDescSchemaBuildPhase builds the schema information of tables of
self-describing data formats,
+ * such as JSON, Parquet, and ORC.
+ */
+public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
+
+ private Processor processor;
+
+ public SelfDescSchemaBuildPhase(CatalogService catalog, ExprAnnotator
annotator) {
+ super(catalog, annotator);
+ }
+
+ @Override
+ public String getName() {
+ return "Self-describing schema build phase";
+ }
+
+ private static String getQualifiedRelationName(PlanContext context,
Relation relation) {
+ return CatalogUtil.isFQTableName(relation.getName()) ?
+ relation.getName() :
+
CatalogUtil.buildFQName(context.getQueryContext().get(SessionVars.CURRENT_DATABASE),
relation.getName());
+ }
+
+ @Override
+ public boolean isEligible(PlanContext context, Expr expr) throws
TajoException {
+ Set<Relation> relations = ExprFinderIncludeSubquery.finds(expr,
OpType.Relation);
+ for (Relation eachRelation : relations) {
+ if (catalog.getTableDesc(getQualifiedRelationName(context,
eachRelation)).hasSelfDescSchema()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ static class FinderContext<T> {
+ Set<T> set = new HashSet<>();
+ OpType targetType;
+
+ FinderContext(OpType type) {
+ this.targetType = type;
+ }
+ }
+
+ private static class ExprFinderIncludeSubquery extends
SimpleAlgebraVisitor<FinderContext, Object> {
+
+ public static <T extends Expr> Set<T> finds(Expr expr, OpType type)
throws TajoException {
+ FinderContext<T> context = new FinderContext<>(type);
+ ExprFinderIncludeSubquery finder = new ExprFinderIncludeSubquery();
+ finder.visit(context, new Stack<Expr>(), expr);
+ return context.set;
+ }
+
+ @Override
+ public Object visit(FinderContext ctx, Stack<Expr> stack, Expr expr)
throws TajoException {
+ if (expr instanceof Selection) {
+ preHook(ctx, stack, expr);
+ visit(ctx, stack, ((Selection) expr).getQual());
+ visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
+ postHook(ctx, stack, expr, null);
+ } else if (expr instanceof UnaryOperator) {
+ preHook(ctx, stack, expr);
+ visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
+ postHook(ctx, stack, expr, null);
+ } else if (expr instanceof BinaryOperator) {
+ preHook(ctx, stack, expr);
+ visitBinaryOperator(ctx, stack, (BinaryOperator) expr);
+ postHook(ctx, stack, expr, null);
+ } else if (expr instanceof SimpleTableSubquery) {
+ preHook(ctx, stack, expr);
+ visit(ctx, stack, ((SimpleTableSubquery) expr).getSubQuery());
+ postHook(ctx, stack, expr, null);
+ } else if (expr instanceof TablePrimarySubQuery) {
+ preHook(ctx, stack, expr);
+ visit(ctx, stack, ((TablePrimarySubQuery) expr).getSubQuery());
+ postHook(ctx, stack, expr, null);
+ } else {
+ super.visit(ctx, stack, expr);
+ }
+
+ if (expr != null && ctx.targetType == expr.getType()) {
+ ctx.set.add(expr);
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ public LogicalNode process(PlanContext context, Expr expr) throws
TajoException {
+ if (processor == null) {
+ processor = new Processor();
+ }
+ return processor.visit(new ProcessorContext(context), new
Stack<Expr>(), expr);
+ }
+
+ static class ProcessorContext {
+ final PlanContext planContext;
+ final Map<String, List<ColumnReferenceExpr>> projectColumns = new
HashMap<>();
+
+ public ProcessorContext(PlanContext planContext) {
+ this.planContext = planContext;
+ }
+ }
+
+ static class Processor extends BaseAlgebraVisitor<ProcessorContext,
LogicalNode> {
+
+ private static <T extends LogicalNode> T getNodeFromExpr(LogicalPlan
plan, Expr expr) {
+ return plan.getBlockByExpr(expr).getNodeFromExpr(expr);
+ }
+
+ private static <T extends LogicalNode> T
getNonRelationListExpr(LogicalPlan plan, Expr expr) {
+ if (expr instanceof RelationList) {
+ return getNodeFromExpr(plan, ((RelationList)
expr).getRelations()[0]);
+ } else {
+ return getNodeFromExpr(plan, expr);
+ }
+ }
+
+ @Override
+ public LogicalNode visitProjection(ProcessorContext ctx, Stack<Expr>
stack, Projection expr) throws TajoException {
+ if (PlannerUtil.hasAsterisk(expr.getNamedExprs())) {
+ throw new UnsupportedException("Asterisk for self-describing data
formats");
+ }
+
+ for (NamedExpr eachNamedExpr : expr.getNamedExprs()) {
+ Set<ColumnReferenceExpr> columns = ExprFinder.finds(eachNamedExpr,
OpType.Column);
+ for (ColumnReferenceExpr col : columns) {
+ TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(),
col);
+ }
+ }
+
+ super.visitProjection(ctx, stack, expr);
+
+ ProjectionNode node = getNodeFromExpr(ctx.planContext.getPlan(),
expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
+ node.setInSchema(child.getOutSchema());
+
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitLimit(ProcessorContext ctx, Stack<Expr> stack,
Limit expr) throws TajoException {
+ super.visitLimit(ctx, stack, expr);
+
+ LimitNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitSort(ProcessorContext ctx, Stack<Expr> stack,
Sort expr) throws TajoException {
+ super.visitSort(ctx, stack, expr);
+
+ SortNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitHaving(ProcessorContext ctx, Stack<Expr>
stack, Having expr) throws TajoException {
+ super.visitHaving(ctx, stack, expr);
+
+ HavingNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitGroupBy(ProcessorContext ctx, Stack<Expr>
stack, Aggregation expr) throws TajoException {
+ super.visitGroupBy(ctx, stack, expr);
+
+ GroupbyNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
+ node.setInSchema(child.getOutSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitJoin(ProcessorContext ctx, Stack<Expr> stack,
Join expr) throws TajoException {
+ super.visitJoin(ctx, stack, expr);
+
+ JoinNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode leftChild =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
+ LogicalNode rightChild =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getRight());
+ node.setInSchema(SchemaUtil.merge(leftChild.getOutSchema(),
rightChild.getOutSchema()));
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitFilter(ProcessorContext ctx, Stack<Expr>
stack, Selection expr) throws TajoException {
+ Set<ColumnReferenceExpr> columnSet =
ExprFinder.finds(expr.getQual(), OpType.Column);
+ for (ColumnReferenceExpr col : columnSet) {
+ NameRefInSelectListNormalizer.normalize(ctx.planContext, col);
+ TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(), col);
+ }
+
+ super.visitFilter(ctx, stack, expr);
+
+ SelectionNode node = getNodeFromExpr(ctx.planContext.getPlan(),
expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getChild());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitUnion(ProcessorContext ctx, Stack<Expr> stack,
SetOperation expr) throws TajoException {
+ super.visitUnion(ctx, stack, expr);
+
+ UnionNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitExcept(ProcessorContext ctx, Stack<Expr>
stack, SetOperation expr) throws TajoException {
+ super.visitExcept(ctx, stack, expr);
+
+ ExceptNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitIntersect(ProcessorContext ctx, Stack<Expr>
stack, SetOperation expr) throws TajoException {
+ super.visitIntersect(ctx, stack, expr);
+
+ IntersectNode node = getNodeFromExpr(ctx.planContext.getPlan(),
expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getLeft());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitSimpleTableSubquery(ProcessorContext ctx,
Stack<Expr> stack, SimpleTableSubquery expr)
+ throws TajoException {
+ super.visitSimpleTableSubquery(ctx, stack, expr);
+
+ TableSubQueryNode node = getNodeFromExpr(ctx.planContext.getPlan(),
expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitTableSubQuery(ProcessorContext ctx,
Stack<Expr> stack, TablePrimarySubQuery expr)
+ throws TajoException {
+ super.visitTableSubQuery(ctx, stack, expr);
+
+ TableSubQueryNode node = getNodeFromExpr(ctx.planContext.getPlan(),
expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitCreateTable(ProcessorContext ctx, Stack<Expr>
stack, CreateTable expr) throws TajoException {
+ super.visitCreateTable(ctx, stack, expr);
+ CreateTableNode node = getNodeFromExpr(ctx.planContext.getPlan(),
expr);
+
+ if (expr.hasSubQuery()) {
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ }
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitInsert(ProcessorContext ctx, Stack<Expr>
stack, Insert expr) throws TajoException {
+ super.visitInsert(ctx, stack, expr);
+
+ InsertNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
+ LogicalNode child =
getNonRelationListExpr(ctx.planContext.getPlan(), expr.getSubQuery());
+ node.setInSchema(child.getOutSchema());
+ node.setOutSchema(node.getInSchema());
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitRelation(ProcessorContext ctx, Stack<Expr>
stack, Relation expr) throws TajoException {
+ LogicalPlan plan = ctx.planContext.getPlan();
+ QueryBlock queryBlock = plan.getBlockByExpr(expr);
+ ScanNode scan = queryBlock.getNodeFromExpr(expr);
+ TableDesc desc = scan.getTableDesc();
+
+ if (desc.hasSelfDescSchema()) {
+ if
(ctx.projectColumns.containsKey(getQualifiedRelationName(ctx.planContext,
expr))) {
+ Set<Column> columns = new HashSet<>();
+ for (ColumnReferenceExpr col :
ctx.projectColumns.get(getQualifiedRelationName(ctx.planContext, expr))) {
+ columns.add(NameResolver.resolve(plan, queryBlock, col,
NameResolvingMode.RELS_ONLY, true));
+ }
+
+ desc.setSchema(buildSchemaFromColumnSet(columns));
+ scan.init(desc);
+ } else {
+ // error
+ throw new TajoInternalError(
+ "Columns projected from " +
getQualifiedRelationName(ctx.planContext, expr) + " is not found.");
+ }
+ }
+
+ return scan;
+ }
+
+ /**
+ * This method creates a schema from a set of columns.
+ * For a nested column, its ancestors are guessed and added to the
schema.
+ * For example, given a column 'glossary.title', the columns of
(glossary RECORD (title TEXT)) will be added
+ * to the schema.
+ *
+ * @param columns a set of columns
+ * @return schema build from columns
+ */
+ private Schema buildSchemaFromColumnSet(Set<Column> columns) {
+ SchemaGraph schemaGraph = new SchemaGraph();
+ Set<ColumnVertex> rootVertexes = new HashSet<>();
+ Schema schema = new Schema();
+
+ for (Column eachColumn : columns) {
+ String simpleName = eachColumn.getSimpleName();
+ if (NestedPathUtil.isPath(simpleName)) {
+ String[] paths = simpleName.split(NestedPathUtil.PATH_DELIMITER);
+ for (int i = 0; i < paths.length-1; i++) {
+ String parentName = paths[i];
+ if (i == 0) {
+ parentName =
CatalogUtil.buildFQName(eachColumn.getQualifier(), parentName);
+ }
+ // Leaf column type is TEXT; otherwise, RECORD.
+ Type childDataType = (i == paths.length-2) ? Type.TEXT :
Type.RECORD;
--- End diff --
These lines exceed 120 column width. In addition, ``visitCreateTable()`` in
this file exceeds 120 too.
> Well support for self-describing data formats
> ---------------------------------------------
>
> Key: TAJO-1832
> URL: https://issues.apache.org/jira/browse/TAJO-1832
> Project: Tajo
> Issue Type: New Feature
> Components: Planner/Optimizer
> Reporter: Jihoon Son
> Assignee: Jihoon Son
>
> *Problem*
> Tajo already has a support for self-describing data formats like JSON,
> Parquet, or ORC. While they are capable of providing schema information by
> themselves, users must define schema to query on them with the current
> implementation. To solve this inconvenience, we have to improve our query
> planner to support self-describing data formats well.
> *Solution*
> First, we need to allow omitting schema definition for the create table
> statement. When a query is submitted for a self-describing table, the columns
> which don't exist in that table will be filled with Nulls.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)