http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java index 9515fe8..6c49e32 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java @@ -20,7 +20,6 @@ package org.apache.tajo.plan.expr; import com.google.common.base.Preconditions; import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.exception.UnsupportedException; import java.util.Stack; @@ -36,7 +35,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { EvalNode result; if (evalNode instanceof UnaryEval) { - result = visitUnaryEval(context, stack, (UnaryEval) evalNode); + result = visitUnaryEval(context, (UnaryEval) evalNode, stack); } else if (evalNode instanceof BinaryEval) { result = visitBinaryEval(context, stack, (BinaryEval) evalNode); } else { @@ -50,7 +49,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { result = visitRowConstant(context, (RowConstantEval) evalNode, stack); break; case FIELD: - result = visitField(context, stack, (FieldEval) evalNode); + result = visitField(context, (FieldEval) evalNode, stack); break; @@ -88,7 +87,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { return result; } - protected EvalNode visitUnaryEval(CONTEXT context, Stack<EvalNode> stack, UnaryEval unaryEval) { + protected EvalNode visitUnaryEval(CONTEXT context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); visit(context, unaryEval.getChild(), stack); stack.pop(); @@ -126,7 +125,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { return evalNode; } - protected EvalNode visitField(CONTEXT context, Stack<EvalNode> stack, FieldEval evalNode) { + protected EvalNode visitField(CONTEXT context, FieldEval evalNode, Stack<EvalNode> stack) { return evalNode; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java index eb546d1..b3fc019 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java @@ -64,7 +64,7 @@ public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanCo } @Override - public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, Stack<EvalNode> stack, UnaryEval unaryEval) { + public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); EvalNode child = visit(context, unaryEval.getChild(), stack); stack.pop(); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java index 6fe3b3e..24488bd 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java @@ -72,7 +72,7 @@ public class ConstantPropagation extends SimpleEvalNodeVisitor<LogicalPlanner.Pl } @Override - public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, Stack<EvalNode> stack, UnaryEval unaryEval) { + public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); if (unaryEval.getChild().getType() == EvalType.FIELD) { http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java index 8e369d6..f5e6b78 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan.logical; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.gson.annotations.Expose; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -37,6 +38,7 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod @Expose protected EvalNode qual; @Expose protected Target[] targets; @Expose protected boolean broadcastTable; + @Expose protected long limit = -1; // -1 is infinite protected ScanNode(int pid, NodeType nodeType) { super(pid, nodeType); @@ -150,6 +152,29 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod return this.targets; } + /** + * + * + * @return + */ + public boolean hasLimit() { + return limit > 0; + } + + /** + * How many rows will be retrieved? + * + * @return The number of rows to be retrieved + */ + public long getLimit() { + return limit; + } + + public void setLimit(long num) { + Preconditions.checkArgument(num > 0, "The number of fetch rows in limit is negative"); + this.limit = num; + } + public TableDesc getTableDesc() { return tableDesc; } @@ -249,9 +274,4 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod return planStr; } - - public static boolean isScanNode(LogicalNode node) { - return node.getType() == NodeType.SCAN || - node.getType() == NodeType.PARTITIONS_SCAN; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java index f66350a..5524256 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java @@ -111,7 +111,7 @@ public class CommonConditionReduceRule implements LogicalPlanRewriteRule { } @Override - protected EvalNode visitUnaryEval(Object context, Stack<EvalNode> stack, UnaryEval unaryEval) { + protected EvalNode visitUnaryEval(Object context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); EvalNode child = unaryEval.getChild(); visit(context, child, stack); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java index 7de0b05..fef1528 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java @@ -102,9 +102,9 @@ public class EvalNodeSerializer } @Override - public EvalNode visitUnaryEval(EvalTreeProtoBuilderContext context, Stack<EvalNode> stack, UnaryEval unary) { + public EvalNode visitUnaryEval(EvalTreeProtoBuilderContext context, UnaryEval unary, Stack<EvalNode> stack) { // visiting and registering childs - super.visitUnaryEval(context, stack, unary); + super.visitUnaryEval(context, unary, stack); int [] childIds = registerGetChildIds(context, unary); // building itself @@ -183,7 +183,7 @@ public class EvalNodeSerializer } @Override - public EvalNode visitField(EvalTreeProtoBuilderContext context, Stack<EvalNode> stack, FieldEval field) { + public EvalNode visitField(EvalTreeProtoBuilderContext context, FieldEval field, Stack<EvalNode> stack) { PlanProto.EvalNode.Builder builder = createEvalBuilder(context, field); builder.setField(field.getColumnRef().getProto()); context.treeBuilder.addNodes(builder); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index a9dca4c..6dd5a7f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -560,20 +560,6 @@ public class PlannerUtil { } } - /** - * fill targets with FieldEvals from a given schema - * - * @param schema to be transformed to targets - * @param targets to be filled - */ - public static void schemaToTargets(Schema schema, Target[] targets) { - FieldEval eval; - for (int i = 0; i < schema.size(); i++) { - eval = new FieldEval(schema.getColumn(i)); - targets[i] = new Target(eval); - } - } - public static Target[] schemaToTargets(Schema schema) { Target[] targets = new Target[schema.size()]; @@ -585,17 +571,6 @@ public class PlannerUtil { return targets; } - public static Target[] schemaToTargetsWithGeneratedFields(Schema schema) { - List<Target> targets = TUtil.newList(); - - FieldEval eval; - for (int i = 0; i < schema.size(); i++) { - eval = new FieldEval(schema.getColumn(i)); - targets.add(new Target(eval)); - } - return targets.toArray(new Target[targets.size()]); - } - public static SortSpec[] schemaToSortSpecs(Schema schema) { return columnsToSortSpecs(schema.toArray()); } @@ -992,4 +967,14 @@ public class PlannerUtil { } return inSubqueries; } + + public static int [] getTargetIds(Schema inputSchema, Column...targets) { + int [] targetIds = new int[targets.length]; + for (int i = 0; i < targetIds.length; i++) { + targetIds[i] = inputSchema.getColumnId(targets[i].getQualifiedName()); + } + Arrays.sort(targetIds); + + return targetIds; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index da1e187..274e674 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -577,11 +577,12 @@ message EnforceProperty { SORTED_INPUT = 0; OUTPUT_DISTINCT = 1; GROUP_BY = 2; - JOIN = 3; - SORT = 4; - BROADCAST = 5; - COLUMN_PARTITION = 6; + JOIN = 3; + SORT = 4; + BROADCAST = 5; + COLUMN_PARTITION = 6; DISTINCT_GROUP_BY = 7; + STORAGE_PUSHDOWN = 8; } // Identifies which field is filled in. @@ -675,4 +676,8 @@ message DistinctGroupbyEnforcer { repeated SortSpecArray sortSpecArrays = 3; required bool isMultipleAggregation = 4 [default = false]; optional MultipleAggregationStage multipleAggregationStage = 5; +} + +message StoragePushdownEnforcer { + required int32 marker = 1; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java index c3dc959..cc8ad83 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java @@ -25,6 +25,7 @@ import org.apache.tajo.exception.NotImplementedException; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import java.io.IOException; @@ -49,6 +50,11 @@ public abstract class AbstractScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { throw new TajoRuntimeException(new NotImplementedException()); } @@ -69,6 +75,10 @@ public abstract class AbstractScanner implements Scanner { } @Override + public void setLimit(long num) { + } + + @Override public boolean isSplittable() { throw new TajoRuntimeException(new NotImplementedException()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java index db64992..6b633bd 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -28,6 +28,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.fragment.Fragment; import java.io.IOException; @@ -153,6 +154,11 @@ public class MergeScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { return projectable; } @@ -173,6 +179,11 @@ public class MergeScanner implements Scanner { } @Override + public void setLimit(long num) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public Schema getSchema() { return schema; } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java index 9c025f7..a20adf7 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java @@ -26,6 +26,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.fragment.Fragment; import java.io.IOException; @@ -73,6 +74,11 @@ public class NullScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { return false; } @@ -93,6 +99,11 @@ public class NullScanner implements Scanner { } @Override + public void setLimit(long num) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isSplittable() { return true; } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java new file mode 100644 index 0000000..7500e9a --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/PlanPushable.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SchemaObject; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Scanner Interface + */ + +public interface PlanPushable { + void pushdown(LogicalNode pushed); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java index 2fcb2fd..f421515 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java @@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SchemaObject; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import java.io.Closeable; import java.io.IOException; @@ -58,6 +59,11 @@ public interface Scanner extends SchemaObject, Closeable { */ void close() throws IOException; + /** + * + * @param planPart + */ + void pushOperators(LogicalNode planPart); /** * It returns if the projection is executed in the underlying scanner layer. @@ -96,6 +102,15 @@ public interface Scanner extends SchemaObject, Closeable { */ void setFilter(EvalNode filter); + + /** + * This method does not guarantee that the scanner will retrieve the specified number of rows. + * This information is used for a hint. + * + * @param num The number of rows to be retrieved. + */ + void setLimit(long num); + /** * It returns if the file is splittable. * http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 83ae935..92d8b72 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -97,7 +97,7 @@ public abstract class Tablespace { return name + "=" + uri.toString(); } - public abstract long getTableVolume(URI uri) throws IOException; + public abstract long getTableVolume(URI uri) throws UnsupportedException; /** * if {@link StorageProperty#isArbitraryPathAllowed} is true, @@ -150,7 +150,7 @@ public abstract class Tablespace { * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER. * In general Repartitioner determines the partition range using previous output statistics data. * In the special cases, such as HBase Repartitioner uses the result of this method. - * + *ã ã * @param queryContext The current query context which contains query properties. * @param tableDesc The table description for the target data. * @param inputSchema The input schema @@ -187,38 +187,18 @@ public abstract class Tablespace { * @param meta The table meta * @param schema The input schema * @param fragment The fragment for scanning - * @param target Columns which are selected. - * @return Scanner instance - * @throws java.io.IOException - */ - public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); - } - - /** - * Returns Scanner instance. - * - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @return Scanner instance - * @throws java.io.IOException - */ - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { - return getScanner(meta, schema, fragment, schema); - } - - /** - * Returns Scanner instance. - * - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning * @param target The output schema * @return Scanner instance * @throws java.io.IOException */ - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + public Scanner getScanner(TableMeta meta, + Schema schema, + Fragment fragment, + @Nullable Schema target) throws IOException { + if (target == null) { + target = schema; + } + if (fragment.isEmpty()) { Scanner scanner = new NullScanner(conf, schema, meta, fragment); scanner.setTarget(target.toArray()); @@ -255,7 +235,7 @@ public abstract class Tablespace { */ public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return (SeekableScanner)this.getScanner(meta, schema, fragment, target); + return (SeekableScanner)this.getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); } /** @@ -405,4 +385,9 @@ public abstract class Tablespace { public MetadataProvider getMetadataProvider() { throw new TajoRuntimeException(new UnsupportedException("Linked Metadata Provider for " + name)); } + + @SuppressWarnings("unused") + public int markAccetablePlanPart(LogicalPlan plan) { + throw new TajoRuntimeException(new UnsupportedException()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index 17f03a0..e57dc3a 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,8 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.MetadataProvider; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.apache.tajo.util.Pair; import org.apache.tajo.util.UriUtil; @@ -75,13 +76,14 @@ public class TablespaceManager implements StorageService { protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap(); protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap(); - public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class, JSONObject.class}; + public static final Class[] TABLESPACE_PARAM = new Class[]{String.class, URI.class, JSONObject.class}; public static final String TABLESPACE_SPEC_CONFIGS_KEY = "configs"; static { instance = new TablespaceManager(); } + /** * Singleton instance */ @@ -130,7 +132,7 @@ public class TablespaceManager implements StorageService { private JSONObject loadFromConfig(String fileName) { String json; try { - json = FileUtil.readTextFileFromResource(fileName); + json = JavaResourceUtil.readTextFromResource(fileName); } catch (IOException e) { throw new RuntimeException(e); } @@ -182,7 +184,7 @@ public class TablespaceManager implements StorageService { String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER); return new Pair<String, Class<? extends Tablespace>>( - storageType,(Class<? extends Tablespace>) Class.forName(handlerClass)); + storageType, (Class<? extends Tablespace>) Class.forName(handlerClass)); } private void loadTableSpaces(JSONObject json, boolean override) { @@ -339,7 +341,7 @@ public class TablespaceManager implements StorageService { // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific. - for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) { + for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) { if (uri.startsWith(entry.getKey().toString())) { lastOne = entry.getValue(); } @@ -401,6 +403,11 @@ public class TablespaceManager implements StorageService { return space.getTableUri(databaseName, tableName); } + @Override + public long getTableVolumn(URI tableUri) throws UnsupportedException { + return get(tableUri).get().getTableVolume(tableUri); + } + public static Iterable<Tablespace> getAllTablespaces() { return TABLE_SPACES.values(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java index 07720c7..bd46551 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; import org.apache.tajo.annotation.ThreadSafe; +import org.apache.tajo.exception.TajoInternalError; import java.io.IOException; import java.lang.reflect.Constructor; @@ -72,8 +73,8 @@ public class FragmentConvertor { CONSTRUCTOR_CACHE.put(clazz, constructor); } result = constructor.newInstance(new Object[]{fragment.getContents()}); - } catch (Exception e) { - throw new RuntimeException(e); + } catch (Throwable e) { + throw new TajoInternalError(e); } return result; http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index dfdff85..35461fa 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -39,7 +39,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value> + <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase,jdbc</value> </property> <!--- Fragment Class Configurations --> @@ -83,6 +83,10 @@ <name>tajo.storage.fragment.hbase.class</name> <value>org.apache.tajo.storage.hbase.HBaseFragment</value> </property> + <property> + <name>tajo.storage.fragment.jdbc.class</name> + <value>org.apache.tajo.storage.jdbc.JdbcFragment</value> + </property> <!--- Scanner Handler --> <property> http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index f637da0..c57a67a 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -82,6 +82,10 @@ <name>tajo.storage.fragment.hbase.class</name> <value>org.apache.tajo.storage.hbase.HBaseFragment</value> </property> + <property> + <name>tajo.storage.fragment.jdbc.class</name> + <value>org.apache.tajo.storage.jdbc.JdbcFragment</value> + </property> <!--- Scanner Handler --> <property> http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java index 7fc6d2a..90f7aa0 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -38,6 +38,7 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.exception.*; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.BytesUtils; @@ -415,6 +416,11 @@ public class HBaseScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { return true; } @@ -438,6 +444,10 @@ public class HBaseScanner implements Scanner { } @Override + public void setLimit(long num) { + } + + @Override public boolean isSplittable() { return true; } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 330522b..0064be4 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -44,10 +44,7 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.exception.DataTypeMismatchException; -import org.apache.tajo.exception.InvalidTablePropertyException; -import org.apache.tajo.exception.MissingTablePropertyException; -import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.CreateTableNode; @@ -102,8 +99,8 @@ public class HBaseTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri) throws IOException { - return 0; + public long getTableVolume(URI uri) throws UnsupportedException { + throw new UnsupportedException(); } @Override @@ -738,8 +735,6 @@ public class HBaseTablespace extends Tablespace { public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual, Column[] indexableColumns) throws IOException { - Preconditions.checkNotNull(qual); - List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>(); // if a query statement has a search condition, try to find indexable predicates http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java index 8844fa5..33f1d04 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -29,6 +29,9 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; @@ -82,6 +85,11 @@ public abstract class FileScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public void setTarget(Column[] targets) { if (inited) { throw new IllegalStateException("Should be called before init()"); @@ -89,6 +97,10 @@ public abstract class FileScanner implements Scanner { this.targets = targets; } + @Override + public void setLimit(long num) { + } + public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException { String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME); FileSystem fs; http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index bcc0c88..9f29c34 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -35,6 +35,8 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.LogicalNode; @@ -126,9 +128,14 @@ public class FileTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri) throws IOException { + public long getTableVolume(URI uri) throws UnsupportedException { Path path = new Path(uri); - ContentSummary summary = fs.getContentSummary(path); + ContentSummary summary; + try { + summary = fs.getContentSummary(path); + } catch (IOException e) { + throw new TajoInternalError(e); + } return summary.getLength(); } @@ -146,7 +153,7 @@ public class FileTablespace extends Tablespace { public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - return getScanner(meta, schema, fragment); + return getScanner(meta, schema, fragment, null); } public FileSystem getFileSystem() { http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index eabab22..76c9362 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -25,6 +25,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.FieldSerializerDeserializer; import org.apache.tajo.storage.Tuple; @@ -41,12 +42,7 @@ public class CSVLineDeserializer extends TextLineDeserializer { public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { super(schema, meta); - - targetColumnIndexes = new int[projected.length]; - for (int i = 0; i < projected.length; i++) { - targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName()); - } - Arrays.sort(targetColumnIndexes); + targetColumnIndexes = PlannerUtil.getTargetIds(schema, projected); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java index 90bec65..5f8a4d1 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@ -106,7 +106,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple tuple; @@ -128,7 +128,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); assertNotNull(scanner.next()); @@ -150,7 +150,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); FileFragment fragment = getFileFragment("testErrorTolerance2.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); try { @@ -169,7 +169,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance3.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); try { @@ -185,7 +185,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "2"); FileFragment fragment = getFileFragment("testNormal.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); @@ -212,7 +212,7 @@ public class TestDelimitedTextFile { meta.putOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "1"); meta.putOption(StorageConstants.TEXT_DELIMITER, ","); FileFragment fragment = getFileFragment("testSkip.txt"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index 21fff58..b800ed2 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -38,6 +38,7 @@ import org.apache.tajo.storage.text.DelimitedLineReader; import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.junit.Test; import java.io.File; @@ -213,7 +214,7 @@ public class TestLineReader { @Test public void testByteBufLineReaderWithoutTerminating() throws IOException { - String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile(); + String path = JavaResourceUtil.getResourceURL("dataset/testLineText.txt").getFile(); File file = new File(path); String data = FileUtil.readTextFile(file); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index afe0f13..9c194b2 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -42,7 +42,7 @@ import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.rcfile.RCFile; import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.apache.tajo.util.KeyValueSet; import org.junit.Test; import org.junit.runner.RunWith; @@ -342,7 +342,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType, options); meta.setOptions(CatalogUtil.newDefaultProperty(storeType)); if (storeType.equalsIgnoreCase("AVRO")) { - String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString(); + String path = JavaResourceUtil.getResourceURL("dataset/testVariousTypes.avsc").toString(); meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); } @@ -379,7 +379,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = sm.getScanner(meta, schema, fragment); + Scanner scanner = sm.getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -468,7 +468,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -543,7 +543,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -612,7 +612,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -681,7 +681,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); assertTrue(scanner instanceof SequenceFileScanner); @@ -755,7 +755,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); assertTrue(scanner instanceof SequenceFileScanner); @@ -800,7 +800,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -936,7 +936,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = sm.getScanner(meta, schema, fragment); + Scanner scanner = sm.getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -997,7 +997,7 @@ public class TestStorages { inSchema.addColumn("col5", Type.INT8); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment, null); Schema target = new Schema(); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java index 341cc07..960448c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java @@ -25,6 +25,7 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.apache.tajo.util.NetUtils; import org.junit.Before; import org.junit.Test; @@ -46,7 +47,7 @@ public class TestAvroUtil { @Before public void setUp() throws Exception { - schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc"); + schemaUrl = JavaResourceUtil.getResourceURL("dataset/testVariousTypes.avsc"); assertNotNull(schemaUrl); File file = new File(schemaUrl.getPath()); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java index 88d7536..75e59da 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java @@ -69,7 +69,7 @@ public class TestJsonSerDe { FileSystem fs = FileSystem.getLocal(conf); FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple tuple = scanner.next(); @@ -108,7 +108,7 @@ public class TestJsonSerDe { schema.addColumn("col1", TajoDataTypes.Type.TEXT); schema.addColumn("col2", TajoDataTypes.Type.TEXT); schema.addColumn("col3", TajoDataTypes.Type.TEXT); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple tuple = scanner.next(); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java index 9a42b0d..57b0825 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java index 2fc42b7..e8a18fc 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java @@ -60,20 +60,8 @@ public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, Cloneab return inputSourceId; } - @Override - public CatalogProtos.FragmentProto getProto() { - JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder(); - builder.setInputSourceId(this.inputSourceId); - builder.setUri(this.uri); - if(hostNames != null) { - builder.addAllHosts(TUtil.newList(hostNames)); - } - - CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder(); - fragmentBuilder.setId(this.inputSourceId); - fragmentBuilder.setStoreType(BuiltinStorages.TEXT); - fragmentBuilder.setContents(builder.buildPartial().toByteString()); - return fragmentBuilder.build(); + public String getUri() { + return uri; } @Override @@ -97,6 +85,22 @@ public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, Cloneab } @Override + public CatalogProtos.FragmentProto getProto() { + JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder(); + builder.setInputSourceId(this.inputSourceId); + builder.setUri(this.uri); + if(hostNames != null) { + builder.addAllHosts(TUtil.newList(hostNames)); + } + + CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder(); + fragmentBuilder.setId(this.inputSourceId); + fragmentBuilder.setStoreType("JDBC"); + fragmentBuilder.setContents(builder.buildPartial().toByteString()); + return fragmentBuilder.build(); + } + + @Override public int compareTo(JdbcFragment o) { return this.uri.compareTo(o.uri); } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java index 7f63c66..f623f14 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.*; import org.apache.tajo.util.KeyValueSet; @@ -187,7 +188,7 @@ public abstract class JdbcMetadataProviderBase implements MetadataProvider { // get columns resultForColumns = connection.getMetaData().getColumns(databaseName, schemaName, tableName, null); - List<Pair<Integer, Column>> columns = Lists.newArrayList(); + final List<Pair<Integer, Column>> columns = Lists.newArrayList(); while(resultForColumns.next()) { final int ordinalPos = resultForColumns.getInt("ORDINAL_POSITION"); @@ -208,19 +209,27 @@ public abstract class JdbcMetadataProviderBase implements MetadataProvider { }); // transform the pair list into collection for columns - Schema schema = new Schema(Collections2.transform(columns, new Function<Pair<Integer,Column>, Column>() { + final Schema schema = new Schema(Collections2.transform(columns, new Function<Pair<Integer,Column>, Column>() { @Override public Column apply(@Nullable Pair<Integer, Column> columnPair) { return columnPair.getSecond(); } })); - return new TableDesc( + + // fill the table stats + final TableStats stats = new TableStats(); + stats.setNumRows(-1); // unknown + + final TableDesc table = new TableDesc( CatalogUtil.buildFQName(databaseName, name), schema, new TableMeta("rowstore", new KeyValueSet()), space.getTableUri(databaseName, name) ); + table.setStats(stats); + + return table; } catch (SQLException e) { throw new TajoInternalError(e); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java new file mode 100644 index 0000000..897f6c5 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedDataTypeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.datetime.DateTimeUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.*; +import java.util.Iterator; + +public abstract class JdbcScanner implements Scanner { + private static final Log LOG = LogFactory.getLog(JdbcScanner.class); + + protected final DatabaseMetaData dbMetaData; + protected final String tableName; + protected final Schema schema; + protected final TableMeta tableMeta; + protected final JdbcFragment fragment; + protected final TableStats stats; + protected final SQLBuilder builder; + + protected Column [] targets; + protected EvalNode filter; + protected Long limit; + protected LogicalNode planPart; + protected VTuple outTuple; + protected String generatedSql; + protected ResultSetIterator iter; + + public JdbcScanner(final DatabaseMetaData dbMetaData, + final Schema tableSchema, + final TableMeta tableMeta, + final JdbcFragment fragment) { + + Preconditions.checkNotNull(dbMetaData); + Preconditions.checkNotNull(tableSchema); + Preconditions.checkNotNull(tableMeta); + Preconditions.checkNotNull(fragment); + + this.dbMetaData = dbMetaData; + this.tableName = ConnectionInfo.fromURI(fragment.getUri()).tableName; + this.schema = tableSchema; + this.tableMeta = tableMeta; + this.fragment = fragment; + this.stats = new TableStats(); + builder = getSQLBuilder(getSQLExprBuilder()); + } + + @Override + public void init() throws IOException { + if (targets == null) { + targets = schema.toArray(); + } + outTuple = new VTuple(targets.length); + + if (planPart == null) { + generatedSql = builder.build(tableName, targets, filter, limit); + } else { + generatedSql = builder.build(planPart); + } + } + + @Override + public Tuple next() throws IOException { + if (iter == null) { + iter = executeQueryAndGetIter(); + } + + if (iter.hasNext()) { + return iter.next(); + } else { + return null; + } + } + + @Override + public void reset() throws IOException { + if (iter != null) { + iter.rewind(); + } + } + + @Override + public void close() throws IOException { + if (iter != null) { + iter.close(); + } + } + + @Override + public void pushOperators(LogicalNode planPart) { + this.planPart = planPart; + } + + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public void setTarget(Column [] targets) { + this.targets = targets; + } + + @Override + public boolean isSelectable() { + return true; + } + + @Override + public void setFilter(EvalNode filter) { + this.filter = filter; + } + + @Override + public void setLimit(long num) { + this.limit = num; + } + + @Override + public boolean isSplittable() { + return false; + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public TableStats getInputStats() { + return stats; + } + + @Override + public Schema getSchema() { + return schema; + } + + protected SQLBuilder getSQLBuilder(SQLExpressionGenerator exprBuilder) { + return new SQLBuilder(dbMetaData, getSQLExprBuilder()); + } + protected SQLExpressionGenerator getSQLExprBuilder() { + return new SQLExpressionGenerator(dbMetaData); + } + + protected void convertTuple(ResultSet resultSet, VTuple tuple) { + try { + for (int column_idx = 0; column_idx < targets.length; column_idx++) { + final Column c = targets[column_idx]; + final int resultIdx = column_idx + 1; + + switch (c.getDataType().getType()) { + case INT1: + case INT2: + tuple.put(column_idx, DatumFactory.createInt2(resultSet.getShort(resultIdx))); + break; + case INT4: + tuple.put(column_idx, DatumFactory.createInt4(resultSet.getInt(resultIdx))); + break; + case INT8: + tuple.put(column_idx, DatumFactory.createInt8(resultSet.getLong(resultIdx))); + break; + case FLOAT4: + tuple.put(column_idx, DatumFactory.createFloat4(resultSet.getFloat(resultIdx))); + break; + case FLOAT8: + tuple.put(column_idx, DatumFactory.createFloat8(resultSet.getDouble(resultIdx))); + break; + case CHAR: + case VARCHAR: + case TEXT: + tuple.put(column_idx, DatumFactory.createText(resultSet.getString(resultIdx))); + break; + case DATE: + Date date = resultSet.getDate(resultIdx); + tuple.put(column_idx, DatumFactory.createDate(date.getYear(), date.getMonth(), date.getDay())); + break; + case TIME: + tuple.put(column_idx, DatumFactory.createTime(resultSet.getTime(resultIdx).getTime())); + break; + case TIMESTAMP: + tuple.put(column_idx, + DatumFactory.createTimestamp(DateTimeUtil.javaTimeToJulianTime(resultSet.getTime(resultIdx).getTime()))); + break; + case BINARY: + case VARBINARY: + case BLOB: + tuple.put(column_idx, + DatumFactory.createBlob(resultSet.getBytes(resultIdx))); + break; + default: + throw new TajoInternalError(new UnsupportedDataTypeException(c.getDataType().getType().name())); + } + } + } catch (SQLException s) { + throw new TajoInternalError(s); + } + } + + private ResultSetIterator executeQueryAndGetIter() { + try { + LOG.info("Executed SQL Statement: " + generatedSql); + Connection conn = DriverManager.getConnection(fragment.uri); + Statement statement = conn.createStatement(); + ResultSet resultset = statement.executeQuery(generatedSql); + return new ResultSetIterator((resultset)); + } catch (SQLException s) { + throw new TajoInternalError(s); + } + } + + public class ResultSetIterator implements Iterator<Tuple>, Closeable { + + private final ResultSet resultSet; + + private boolean didNext = false; + private boolean hasNext = false; + + public ResultSetIterator(ResultSet resultSet) { + this.resultSet = resultSet; + } + + @Override + public boolean hasNext() { + if (!didNext) { + + try { + hasNext = resultSet.next(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + didNext = true; + } + return hasNext; + } + + @Override + public Tuple next() { + if (!didNext) { + try { + resultSet.next(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + didNext = false; + convertTuple(resultSet, outTuple); + return outTuple; + } + + @Override + public void remove() { + throw new TajoRuntimeException(new UnsupportedException()); + } + + public void rewind() { + try { + resultSet.isBeforeFirst(); + } catch (SQLException e) { + throw new TajoInternalError(e); + } + } + + @Override + public void close() throws IOException { + try { + resultSet.close(); + } catch (SQLException e) { + LOG.warn(e);; + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index 04709b0..d3ec273 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -25,20 +25,22 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.*; import org.apache.tajo.exception.NotImplementedException; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.storage.FormatProperty; -import org.apache.tajo.storage.StorageProperty; -import org.apache.tajo.storage.Tablespace; -import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.SQLException; import java.util.List; /** @@ -52,6 +54,7 @@ public abstract class JdbcTablespace extends Tablespace { static final StorageProperty STORAGE_PROPERTY = new StorageProperty("rowstore", false, true, false, true); static final FormatProperty FORMAT_PROPERTY = new FormatProperty(false, false, false); + private Connection conn; public JdbcTablespace(String name, URI uri, JSONObject config) { super(name, uri, config); @@ -59,11 +62,16 @@ public abstract class JdbcTablespace extends Tablespace { @Override protected void storageInit() throws IOException { + try { + this.conn = DriverManager.getConnection(uri.toASCIIString()); + } catch (SQLException e) { + throw new IOException(e); + } } @Override - public long getTableVolume(URI uri) throws IOException { - return 0; + public long getTableVolume(URI uri) throws UnsupportedException { + throw new UnsupportedException(); } @Override @@ -138,4 +146,18 @@ public abstract class JdbcTablespace extends Tablespace { } public abstract MetadataProvider getMetadataProvider(); + + @Override + public abstract Scanner getScanner(TableMeta meta, + Schema schema, + Fragment fragment, + @Nullable Schema target) throws IOException; + + public DatabaseMetaData getDatabaseMetaData() { + try { + return conn.getMetaData(); + } catch (SQLException e) { + throw new TajoInternalError(e); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java new file mode 100644 index 0000000..57a4bba --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.common.base.Function; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.Target; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.util.StringUtils; + +import javax.annotation.Nullable; +import java.sql.DatabaseMetaData; +import java.util.Stack; + +/** + * Generator to build a SQL statement from a plan fragment + */ +public class SQLBuilder { + @SuppressWarnings("unused") + private final DatabaseMetaData dbMetaData; + private final SQLExpressionGenerator sqlExprGen; + + public static class SQLBuilderContext { + StringBuilder sb; + } + + public SQLBuilder(DatabaseMetaData dbMetaData, SQLExpressionGenerator exprGen) { + this.dbMetaData = dbMetaData; + this.sqlExprGen = exprGen; + } + + public String build(String tableName, Column [] targets, @Nullable EvalNode filter, @Nullable Long limit) { + + StringBuilder selectClause = new StringBuilder("SELECT "); + selectClause.append(StringUtils.join(targets, ",", new Function<Column, String>() { + @Override + public String apply(@Nullable Column input) { + return input.getSimpleName(); + } + })).append(" "); + + StringBuilder fromClause = new StringBuilder("FROM "); + fromClause.append(tableName).append(" "); + + StringBuilder whereClause = null; + if (filter != null) { + whereClause = new StringBuilder("WHERE "); + whereClause.append(sqlExprGen.generate(filter)).append(" "); + } + + StringBuilder limitClause = null; + if (limit != null) { + limitClause = new StringBuilder("LIMIT "); + limitClause.append(limit).append(" "); + } + + return generateSelectStmt(selectClause, fromClause, whereClause, limitClause); + } + + public String generateSelectStmt(StringBuilder selectClause, + StringBuilder fromClause, + @Nullable StringBuilder whereClause, + @Nullable StringBuilder limitClause) { + return + selectClause.toString() + + fromClause.toString() + + (whereClause != null ? whereClause.toString() : "") + + (limitClause != null ? limitClause.toString() : ""); + } + + public String build(LogicalNode planPart) { + SQLBuilderContext context = new SQLBuilderContext(); + visit(context, planPart, new Stack<LogicalNode>()); + return context.sb.toString(); + } + + public void visit(SQLBuilderContext context, LogicalNode node, Stack<LogicalNode> stack) { + stack.push(node); + + switch (node.getType()) { + case SCAN: + visitScan(context, (ScanNode) node, stack); + break; + + case GROUP_BY: + visitGroupBy(context, (GroupbyNode) node, stack); + break; + + case SELECTION: + visitFilter(context, (SelectionNode) node, stack); + break; + + case PROJECTION: + visitProjection(context, (ProjectionNode) node, stack); + break; + + case TABLE_SUBQUERY: + visitDerivedSubquery(context, (TableSubQueryNode) node, stack); + break; + + default: + throw new TajoRuntimeException(new UnsupportedException("plan node '" + node.getType().name() + "'")); + } + + stack.pop(); + } + + public void visitDerivedSubquery(SQLBuilderContext ctx, TableSubQueryNode derivedSubquery, Stack<LogicalNode> stack) { + ctx.sb.append(" ("); + visit(ctx, derivedSubquery.getSubQuery(), stack); + ctx.sb.append(" ) ").append(derivedSubquery.getTableName()); + } + + public void visitProjection(SQLBuilderContext ctx, ProjectionNode projection, Stack<LogicalNode> stack) { + + visit(ctx, projection.getChild(), stack); + } + + public void visitGroupBy(SQLBuilderContext ctx, GroupbyNode groupby, Stack<LogicalNode> stack) { + visit(ctx, groupby.getChild(), stack); + ctx.sb.append("GROUP BY ").append(StringUtils.join(groupby.getGroupingColumns(), ",", 0)).append(" "); + } + + public void visitFilter(SQLBuilderContext ctx, SelectionNode filter, Stack<LogicalNode> stack) { + visit(ctx, filter.getChild(), stack); + ctx.sb.append("WHERE " + sqlExprGen.generate(filter.getQual())); + } + + public void visitScan(SQLBuilderContext ctx, ScanNode scan, Stack<LogicalNode> stack) { + + StringBuilder selectClause = new StringBuilder("SELECT "); + selectClause.append(StringUtils.join(scan.getTargets(), ",", new Function<Target, String>() { + @Override + public String apply(@Nullable Target t) { + StringBuilder sb = new StringBuilder(sqlExprGen.generate(t.getEvalTree())); + if (t.hasAlias()) { + sb.append(" AS ").append(t.getAlias()); + } + return sb.toString(); + } + })); + + ctx.sb.append("FROM ").append(scan.getTableName()).append(" "); + + if (scan.hasAlias()) { + ctx.sb.append("AS ").append(scan.getAlias()).append(" "); + } + + if (scan.hasQual()) { + ctx.sb.append("WHERE " + sqlExprGen.generate(scan.getQual())); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java new file mode 100644 index 0000000..76c687a --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedDataTypeException; +import org.apache.tajo.plan.expr.*; + +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.Stack; + +/** + * A generator to build a SQL representation from a sql expression + */ +public class SQLExpressionGenerator extends SimpleEvalNodeVisitor<SQLExpressionGenerator.Context> { + final private DatabaseMetaData dbMetaData; + + final private String DEFAULT_QUOTE_STR = "'"; + private String QUOTE_STR; + + public SQLExpressionGenerator(DatabaseMetaData dbMetaData) { + this.dbMetaData = dbMetaData; + initDatabaseDependentSQLRepr(); + } + + private void initDatabaseDependentSQLRepr() { + String quoteStr = null; + try { + quoteStr = dbMetaData.getIdentifierQuoteString(); + } catch (SQLException e) { + } + this.QUOTE_STR = quoteStr != null ? quoteStr : DEFAULT_QUOTE_STR; + } + + public String quote(String text) { + return QUOTE_STR + text + QUOTE_STR; + } + + public String generate(EvalNode node) { + Context context = new Context(); + visit(context, node, new Stack<EvalNode>()); + return context.sb.toString(); + } + + public static class Context { + StringBuilder sb = new StringBuilder(); + + public void put(String text) { + sb.append(" ").append(text).append(" "); + } + } + + protected EvalNode visitBinaryEval(Context context, Stack<EvalNode> stack, BinaryEval binaryEval) { + stack.push(binaryEval); + visit(context, binaryEval.getLeftExpr(), stack); + context.sb.append(convertBinOperatorToSQLRepr(binaryEval.getType())).append(" "); + visit(context, binaryEval.getRightExpr(), stack); + stack.pop(); + return binaryEval; + } + + protected EvalNode visitUnaryEval(Context context, UnaryEval unary, Stack<EvalNode> stack) { + + switch (unary.getType()) { + case NOT: + context.sb.append("NOT "); + super.visitUnaryEval(context, unary, stack); + break; + case SIGNED: + SignedEval signed = (SignedEval) unary; + if (signed.isNegative()) { + context.sb.append("-"); + } + super.visitUnaryEval(context, unary, stack); + break; + case IS_NULL: + super.visitUnaryEval(context, unary, stack); + + IsNullEval isNull = (IsNullEval) unary; + if (isNull.isNot()) { + context.sb.append("IS NOT NULL "); + } else { + context.sb.append("IS NULL "); + } + break; + + case CAST: + super.visitUnaryEval(context, unary, stack); + context.sb.append(" AS ").append(convertTajoTypeToSQLType(unary.getValueType())); + } + return unary; + } + + protected EvalNode visitRowConstant(Context context, RowConstantEval row, Stack<EvalNode> stack) { + StringBuilder sb = new StringBuilder("("); + + boolean first = true; + for (Datum d : row.getValues()) { + if (!first) { + sb.append(","); + first = false; + } + sb.append(convertLiteralToSQLRepr(d)); + } + + sb.append(")"); + + context.put(sb.toString()); + + return row; + } + + @Override + protected EvalNode visitField(Context context, FieldEval field, Stack<EvalNode> stack) { + context.put(field.getName()); + return field; + } + + @Override + protected EvalNode visitConst(Context context, ConstEval constant, Stack<EvalNode> stack) { + context.sb.append(convertLiteralToSQLRepr(constant.getValue())); + return constant; + } + + /** + * convert Tajo literal into SQL representation + * + * @param d Datum + */ + public String convertLiteralToSQLRepr(Datum d) { + switch (d.type()) { + case INT1: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case NUMERIC: + return d.asChars(); + + case TEXT: + case VARCHAR: + case CHAR: + return quote(d.asChars()); + + case DATE: + return "DATE " + quote(d.asChars()); + + case TIME: + return "TIME " + quote(d.asChars()); + + case TIMESTAMP: + return "TIMESTAMP " + quote(d.asChars()); + + case NULL_TYPE: + return "NULL"; + + default: + throw new TajoRuntimeException(new UnsupportedDataTypeException(d.type().name())); + } + } + + /** + * Convert Tajo DataType into SQL DataType + * + * @param dataType Tajo DataType + * @return SQL DataType + */ + public String convertTajoTypeToSQLType(DataType dataType) { + switch (dataType.getType()) { + case INT1: + return "TINYINT"; + case INT2: + return "SMALLINT"; + case INT4: + return "INTEGER"; + case INT8: + return "BIGINT"; + case FLOAT4: + return "FLOAT"; + case FLOAT8: + return "DOUBLE"; + default: + return dataType.getType().name(); + } + } + + /** + * Convert EvalType the operator notation into SQL notation + * + * @param op EvalType + * @return SQL representation + */ + public String convertBinOperatorToSQLRepr(EvalType op) { + return op.getOperatorName(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java b/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java deleted file mode 100644 index 9eee00a..0000000 --- a/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/JdbcTablespaceTestBase.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.jdbc; - -public abstract class JdbcTablespaceTestBase { - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/fa819881/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java b/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java index d3a0c6a..4a2171e 100644 --- a/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java +++ b/tajo-storage/tajo-storage-mysql/src/main/java/org/apache/tajo/storage/mysql/MySQLTablespace.java @@ -18,11 +18,17 @@ package org.apache.tajo.storage.mysql; +import com.google.common.base.Preconditions; import net.minidev.json.JSONObject; import org.apache.tajo.catalog.*; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.jdbc.JdbcFragment; import org.apache.tajo.storage.jdbc.JdbcTablespace; +import javax.annotation.Nullable; +import java.io.IOException; import java.net.URI; /** @@ -42,4 +48,27 @@ public class MySQLTablespace extends JdbcTablespace { public MetadataProvider getMetadataProvider() { return new MySQLMetadataProvider(this, database); } + + @Override + public Scanner getScanner(TableMeta meta, + Schema schema, + Fragment fragment, + @Nullable Schema target) throws IOException { + if (!(fragment instanceof JdbcFragment)) { + throw new TajoInternalError("fragment must be JdbcFragment"); + } + + if (target == null) { + target = schema; + } + + if (fragment.isEmpty()) { + Scanner scanner = new NullScanner(conf, schema, meta, fragment); + scanner.setTarget(target.toArray()); + + return scanner; + } + + return new MySQLJdbcScanner(getDatabaseMetaData(), schema, meta, (JdbcFragment) fragment); + } }
