http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index 4e74862..504c625 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -1339,7 +1339,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex QueryBlock block = context.queryBlock; ScanNode scanNode = block.getNodeFromExpr(expr); - updatePhysicalInfo(context, scanNode.getTableDesc()); + updatePhysicalInfo(scanNode.getTableDesc()); // Find expression which can be evaluated at this relation node. // Except for column references, additional expressions used in select list, where clause, order-by clauses @@ -1398,24 +1398,18 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex return targets; } - private void updatePhysicalInfo(PlanContext planContext, TableDesc desc) { - if (desc.getUri() != null && - !desc.getMeta().getStoreType().equals("SYSTEM") && - !desc.getMeta().getStoreType().equals("FAKEFILE") && // FAKEFILE is used for test - PlannerUtil.isFileStorageType(desc.getMeta().getStoreType())) { + private void updatePhysicalInfo(TableDesc desc) { + + // FAKEFILE is used for test + if (!desc.getMeta().getStoreType().equals("SYSTEM") && !desc.getMeta().getStoreType().equals("FAKEFILE")) { try { - Path path = new Path(desc.getUri()); - FileSystem fs = path.getFileSystem(planContext.queryContext.getConf()); - FileStatus status = fs.getFileStatus(path); - if (desc.getStats() != null && (status.isDirectory() || status.isFile())) { - ContentSummary summary = fs.getContentSummary(path); - if (summary != null) { - long volume = summary.getLength(); - desc.getStats().setNumBytes(volume); - } + if (desc.getStats() != null) { + desc.getStats().setNumBytes(storage.getTableVolumn(desc.getUri())); } - } catch (Throwable t) { - LOG.warn(t, t); + } catch (UnsupportedException t) { + LOG.warn(desc.getName() + " does not support Tablespace::getTableVolume()"); + // -1 means unknown volume size. + desc.getStats().setNumBytes(-1); } } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java index d06c1d3..19d5d16 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java @@ -156,7 +156,7 @@ public class AlgebraicUtil { } @Override - public EvalNode visitUnaryEval(Object context, Stack<EvalNode> stack, UnaryEval unaryEval) { + public EvalNode visitUnaryEval(Object context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); EvalNode child = visit(context, unaryEval.getChild(), stack); stack.pop(); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalType.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalType.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalType.java index 2c2a52f..35390a5 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalType.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalType.java @@ -21,10 +21,11 @@ package org.apache.tajo.plan.expr; public enum EvalType { // Unary expression NOT(NotEval.class, "!"), + SIGNED(SignedEval.class), // Binary expression - AND(BinaryEval.class), - OR(BinaryEval.class), + AND(BinaryEval.class, "AND"), + OR(BinaryEval.class, "OR"), EQUAL(BinaryEval.class, "="), IS_NULL(IsNullEval.class), NOT_EQUAL(BinaryEval.class, "<>"), @@ -49,19 +50,18 @@ public enum EvalType { FUNCTION(GeneralFunctionEval.class), // String operator or pattern matching predicates - LIKE(LikePredicateEval.class), - SIMILAR_TO(SimilarToPredicateEval.class), - REGEX(RegexPredicateEval.class), + LIKE(LikePredicateEval.class, "LIKE"), + SIMILAR_TO(SimilarToPredicateEval.class, "SIMILAR TO"), + REGEX(RegexPredicateEval.class, "REGEX"), CONCATENATE(BinaryEval.class, "||"), // Other predicates BETWEEN(BetweenPredicateEval.class), CASE(CaseWhenEval.class), IF_THEN(CaseWhenEval.IfThenEval.class), - IN(InEval.class), + IN(InEval.class, "IN"), // Value or Reference - SIGNED(SignedEval.class), CAST(CastEval.class), ROW_CONSTANT(RowConstantEval.class), FIELD(FieldEval.class), http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java index 9515fe8..abaf5fe 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/SimpleEvalNodeVisitor.java @@ -20,7 +20,6 @@ package org.apache.tajo.plan.expr; import com.google.common.base.Preconditions; import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.exception.UnsupportedException; import java.util.Stack; @@ -36,7 +35,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { EvalNode result; if (evalNode instanceof UnaryEval) { - result = visitUnaryEval(context, stack, (UnaryEval) evalNode); + result = visitUnaryEval(context, (UnaryEval) evalNode, stack); } else if (evalNode instanceof BinaryEval) { result = visitBinaryEval(context, stack, (BinaryEval) evalNode); } else { @@ -50,7 +49,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { result = visitRowConstant(context, (RowConstantEval) evalNode, stack); break; case FIELD: - result = visitField(context, stack, (FieldEval) evalNode); + result = visitField(context, (FieldEval) evalNode, stack); break; @@ -88,7 +87,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { return result; } - protected EvalNode visitUnaryEval(CONTEXT context, Stack<EvalNode> stack, UnaryEval unaryEval) { + protected EvalNode visitUnaryEval(CONTEXT context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); visit(context, unaryEval.getChild(), stack); stack.pop(); @@ -126,7 +125,7 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { return evalNode; } - protected EvalNode visitField(CONTEXT context, Stack<EvalNode> stack, FieldEval evalNode) { + protected EvalNode visitField(CONTEXT context, FieldEval evalNode, Stack<EvalNode> stack) { return evalNode; } @@ -163,10 +162,6 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> { return evalNode; } - protected EvalNode visitInPredicate(CONTEXT context, InEval evalNode, Stack<EvalNode> stack) { - return visitBinaryEval(context, stack, evalNode); - } - /////////////////////////////////////////////////////////////////////////////////////////////// // Functions /////////////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java index 95f4c76..37b77fd 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java @@ -64,7 +64,7 @@ public class ConstantFolding extends SimpleEvalNodeVisitor<LogicalPlanner.PlanCo } @Override - public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, Stack<EvalNode> stack, UnaryEval unaryEval) { + public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); EvalNode child = visit(context, unaryEval.getChild(), stack); stack.pop(); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java index 6fe3b3e..24488bd 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantPropagation.java @@ -72,7 +72,7 @@ public class ConstantPropagation extends SimpleEvalNodeVisitor<LogicalPlanner.Pl } @Override - public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, Stack<EvalNode> stack, UnaryEval unaryEval) { + public EvalNode visitUnaryEval(LogicalPlanner.PlanContext context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); if (unaryEval.getChild().getType() == EvalType.FIELD) { http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java index 8e369d6..4b8a9e9 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan.logical; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.gson.annotations.Expose; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -37,6 +38,7 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod @Expose protected EvalNode qual; @Expose protected Target[] targets; @Expose protected boolean broadcastTable; + @Expose protected long limit = -1; // -1 means no set protected ScanNode(int pid, NodeType nodeType) { super(pid, nodeType); @@ -150,6 +152,29 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod return this.targets; } + /** + * + * + * @return + */ + public boolean hasLimit() { + return limit > 0; + } + + /** + * How many rows will be retrieved? + * + * @return The number of rows to be retrieved + */ + public long getLimit() { + return limit; + } + + public void setLimit(long num) { + Preconditions.checkArgument(num > 0, "The number of fetch rows in limit is negative"); + this.limit = num; + } + public TableDesc getTableDesc() { return tableDesc; } @@ -249,9 +274,4 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod return planStr; } - - public static boolean isScanNode(LogicalNode node) { - return node.getType() == NodeType.SCAN || - node.getType() == NodeType.PARTITIONS_SCAN; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java index f66350a..5524256 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/CommonConditionReduceRule.java @@ -111,7 +111,7 @@ public class CommonConditionReduceRule implements LogicalPlanRewriteRule { } @Override - protected EvalNode visitUnaryEval(Object context, Stack<EvalNode> stack, UnaryEval unaryEval) { + protected EvalNode visitUnaryEval(Object context, UnaryEval unaryEval, Stack<EvalNode> stack) { stack.push(unaryEval); EvalNode child = unaryEval.getChild(); visit(context, child, stack); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java index 7de0b05..fef1528 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java @@ -102,9 +102,9 @@ public class EvalNodeSerializer } @Override - public EvalNode visitUnaryEval(EvalTreeProtoBuilderContext context, Stack<EvalNode> stack, UnaryEval unary) { + public EvalNode visitUnaryEval(EvalTreeProtoBuilderContext context, UnaryEval unary, Stack<EvalNode> stack) { // visiting and registering childs - super.visitUnaryEval(context, stack, unary); + super.visitUnaryEval(context, unary, stack); int [] childIds = registerGetChildIds(context, unary); // building itself @@ -183,7 +183,7 @@ public class EvalNodeSerializer } @Override - public EvalNode visitField(EvalTreeProtoBuilderContext context, Stack<EvalNode> stack, FieldEval field) { + public EvalNode visitField(EvalTreeProtoBuilderContext context, FieldEval field, Stack<EvalNode> stack) { PlanProto.EvalNode.Builder builder = createEvalBuilder(context, field); builder.setField(field.getColumnRef().getProto()); context.treeBuilder.addNodes(builder); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index d9fb218..0210865 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -560,20 +560,6 @@ public class PlannerUtil { } } - /** - * fill targets with FieldEvals from a given schema - * - * @param schema to be transformed to targets - * @param targets to be filled - */ - public static void schemaToTargets(Schema schema, Target[] targets) { - FieldEval eval; - for (int i = 0; i < schema.size(); i++) { - eval = new FieldEval(schema.getColumn(i)); - targets[i] = new Target(eval); - } - } - public static Target[] schemaToTargets(Schema schema) { Target[] targets = new Target[schema.size()]; @@ -585,17 +571,6 @@ public class PlannerUtil { return targets; } - public static Target[] schemaToTargetsWithGeneratedFields(Schema schema) { - List<Target> targets = TUtil.newList(); - - FieldEval eval; - for (int i = 0; i < schema.size(); i++) { - eval = new FieldEval(schema.getColumn(i)); - targets.add(new Target(eval)); - } - return targets.toArray(new Target[targets.size()]); - } - public static SortSpec[] schemaToSortSpecs(Schema schema) { return columnsToSortSpecs(schema.toArray()); } @@ -993,6 +968,25 @@ public class PlannerUtil { return inSubqueries; } + /** + * Return a list of integers, maps input schema and projected columns. + * Each integer value means a column index of input schema corresponding to each project column + * + * @param inputSchema Input Schema + * @param targets Columns to be projected + * @return A list of integers, each of which is an index number of input schema corresponding + * to each projected column. + */ + public static int [] getTargetIds(Schema inputSchema, Column...targets) { + int [] targetIds = new int[targets.length]; + for (int i = 0; i < targetIds.length; i++) { + targetIds[i] = inputSchema.getColumnId(targets[i].getQualifiedName()); + } + Arrays.sort(targetIds); + + return targetIds; + } + public static List<EvalNode> getAllEqualEvals(EvalNode qual) { EvalTreeUtil.EvalFinder finder = new EvalTreeUtil.EvalFinder(EvalType.EQUAL); finder.visit(null, qual, new Stack<EvalNode>()); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index 0bfac0d..7815cac 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -577,10 +577,10 @@ message EnforceProperty { SORTED_INPUT = 0; OUTPUT_DISTINCT = 1; GROUP_BY = 2; - JOIN = 3; - SORT = 4; - BROADCAST = 5; - COLUMN_PARTITION = 6; + JOIN = 3; + SORT = 4; + BROADCAST = 5; + COLUMN_PARTITION = 6; DISTINCT_GROUP_BY = 7; } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index 903976f..52c6ef9 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -786,6 +786,17 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-jdbc</artifactId> + <version>${tajo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-jdbc</artifactId> + <version>${tajo.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> <artifactId>tajo-pullserver</artifactId> <version>${tajo.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index 913e719..5ee4d17 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -37,6 +37,8 @@ <module>tajo-storage-common</module> <module>tajo-storage-hdfs</module> <module>tajo-storage-hbase</module> + <module>tajo-storage-jdbc</module> + <module>tajo-storage-pgsql</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java index c3dc959..cc8ad83 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java @@ -25,6 +25,7 @@ import org.apache.tajo.exception.NotImplementedException; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import java.io.IOException; @@ -49,6 +50,11 @@ public abstract class AbstractScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { throw new TajoRuntimeException(new NotImplementedException()); } @@ -69,6 +75,10 @@ public abstract class AbstractScanner implements Scanner { } @Override + public void setLimit(long num) { + } + + @Override public boolean isSplittable() { throw new TajoRuntimeException(new NotImplementedException()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java index db64992..b3b1edd 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -28,6 +28,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.fragment.Fragment; import java.io.IOException; @@ -153,6 +154,11 @@ public class MergeScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { return projectable; } @@ -173,6 +179,10 @@ public class MergeScanner implements Scanner { } @Override + public void setLimit(long num) { + } + + @Override public Schema getSchema() { return schema; } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java index 9c025f7..a20adf7 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java @@ -26,6 +26,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.fragment.Fragment; import java.io.IOException; @@ -73,6 +74,11 @@ public class NullScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { return false; } @@ -93,6 +99,11 @@ public class NullScanner implements Scanner { } @Override + public void setLimit(long num) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isSplittable() { return true; } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java index d12c6bd..2e00bc9 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java @@ -161,7 +161,7 @@ public class OldStorageManager { constructor.setAccessible(true); CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); } - manager = constructor.newInstance(new Object[]{"noname", uri}); + manager = constructor.newInstance(new Object[]{"noname", uri, null}); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java index 2fcb2fd..e95f318 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java @@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SchemaObject; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import java.io.Closeable; import java.io.IOException; @@ -58,6 +59,12 @@ public interface Scanner extends SchemaObject, Closeable { */ void close() throws IOException; + /** + * Push a plan part into scanner. It will be used in future issues. + * + * @param planPart + */ + void pushOperators(LogicalNode planPart); /** * It returns if the projection is executed in the underlying scanner layer. @@ -96,6 +103,15 @@ public interface Scanner extends SchemaObject, Closeable { */ void setFilter(EvalNode filter); + + /** + * This method does not guarantee that the scanner will retrieve the specified number of rows. + * This information is used for a hint. + * + * @param num The number of rows to be retrieved. + */ + void setLimit(long num); + /** * It returns if the file is splittable. * http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java index c1db34e..41ecd38 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java @@ -30,16 +30,20 @@ public class StorageProperty { private final boolean writable; /** if this storage allows use of artibrary paths */ private final boolean absolutePathAllowed; + /** if this storage provides metadata provider */ + private final boolean metadataProvided; public StorageProperty(String defaultFormat, boolean movable, boolean writable, - boolean absolutePathAllowed) { + boolean absolutePathAllowed, + boolean metadataProvided) { this.defaultFormat = defaultFormat; this.movable = movable; this.writable = writable; this.absolutePathAllowed = absolutePathAllowed; + this.metadataProvided = metadataProvided; } /** @@ -76,4 +80,13 @@ public class StorageProperty { public boolean isArbitraryPathAllowed() { return this.absolutePathAllowed; } + + /** + * Is metadata provided? + * + * @return True if this storage provides linked metadata. + */ + public boolean isMetadataProvided() { + return this.metadataProvided; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index d131d9e..1c6f433 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -18,30 +18,28 @@ package org.apache.tajo.storage; +import net.minidev.json.JSONObject; import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; /** @@ -54,14 +52,20 @@ public abstract class Tablespace { protected final String name; protected final URI uri; + protected final JSONObject config; /** this space is visible or not. */ protected boolean visible = true; protected TajoConf conf; - public Tablespace(String name, URI uri) { + public Tablespace(String name, URI uri, JSONObject config) { this.name = name; this.uri = uri; + this.config = config; + } + + public JSONObject getConfig() { + return config; } public void setVisible(boolean visible) { @@ -90,15 +94,11 @@ public abstract class Tablespace { return visible; } - public abstract void setConfig(String name, String value); - - public abstract void setConfigs(Map<String, String> configs); - public String toString() { return name + "=" + uri.toString(); } - public abstract long getTableVolume(URI uri) throws IOException; + public abstract long getTableVolume(URI uri) throws UnsupportedException; /** * if {@link StorageProperty#isArbitraryPathAllowed} is true, @@ -114,22 +114,24 @@ public abstract class Tablespace { /** * Get Table URI * - * @param tableName - * @return + * @param databaseName Database name + * @param tableName Table name + * @return Table URI */ public abstract URI getTableUri(String databaseName, String tableName); /** * Returns the splits that will serve as input for the scan tasks. The * number of splits matches the number of regions in a table. - * @param fragmentId The table name or previous ExecutionBlockId + * @param inputSourceId Input source identifier, which can be either relation name or execution block id * @param tableDesc The table description for the target data. - * @param scanNode The logical node for scanning. + * @param filterCondition filter condition which can prune splits if possible * @return The list of input fragments. * @throws java.io.IOException */ - public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, - ScanNode scanNode) throws IOException, TajoException; + public abstract List<Fragment> getSplits(String inputSourceId, + TableDesc tableDesc, + @Nullable EvalNode filterCondition) throws IOException, TajoException; /** * It returns the storage property. @@ -181,46 +183,6 @@ public abstract class Tablespace { } /** - * Returns the splits that will serve as input for the scan tasks. The - * number of splits matches the number of regions in a table. - * - * @param fragmentId The table name or previous ExecutionBlockId - * @param tableDesc The table description for the target data. - * @return The list of input fragments. - * @throws java.io.IOException - */ - public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException, TajoException { - return getSplits(fragmentId, tableDesc, null); - } - - /** - * Returns Scanner instance. - * - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @param target Columns which are selected. - * @return Scanner instance - * @throws java.io.IOException - */ - public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); - } - - /** - * Returns Scanner instance. - * - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @return Scanner instance - * @throws java.io.IOException - */ - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { - return getScanner(meta, schema, fragment, schema); - } - - /** * Returns Scanner instance. * * @param meta The table meta @@ -230,7 +192,14 @@ public abstract class Tablespace { * @return Scanner instance * @throws java.io.IOException */ - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + public Scanner getScanner(TableMeta meta, + Schema schema, + Fragment fragment, + @Nullable Schema target) throws IOException { + if (target == null) { + target = schema; + } + if (fragment.isEmpty()) { Scanner scanner = new NullScanner(conf, schema, meta, fragment); scanner.setTarget(target.toArray()); @@ -267,7 +236,7 @@ public abstract class Tablespace { */ public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return (SeekableScanner)this.getScanner(meta, schema, fragment, target); + return (SeekableScanner)this.getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); } /** @@ -413,4 +382,13 @@ public abstract class Tablespace { TableMeta meta) throws IOException { throw new IOException("Staging the output result is not supported in this storage"); } + + public MetadataProvider getMetadataProvider() { + throw new TajoRuntimeException(new UnsupportedException("Linked Metadata Provider for " + name)); + } + + @SuppressWarnings("unused") + public int markAccetablePlanPart(LogicalPlan plan) { + throw new TajoRuntimeException(new UnsupportedException()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index ef71ada..a1fa857 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -19,8 +19,10 @@ package org.apache.tajo.storage; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.collect.Maps; import net.minidev.json.JSONObject; import net.minidev.json.parser.JSONParser; @@ -29,16 +31,20 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.MetadataProvider; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.UriUtil; import javax.annotation.Nullable; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; import java.net.URI; +import java.util.Collection; import java.util.Map; import java.util.TreeMap; import java.util.UUID; @@ -71,11 +77,14 @@ public class TablespaceManager implements StorageService { protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap(); protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap(); - public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class}; + public static final Class[] TABLESPACE_PARAM = new Class[]{String.class, URI.class, JSONObject.class}; + + public static final String TABLESPACE_SPEC_CONFIGS_KEY = "configs"; static { instance = new TablespaceManager(); } + /** * Singleton instance */ @@ -94,7 +103,7 @@ public class TablespaceManager implements StorageService { } private void addLocalFsTablespace() { - if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) { + if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null && TABLE_SPACE_HANDLERS.containsKey("file")) { String tmpName = UUID.randomUUID().toString(); registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false); } @@ -124,9 +133,9 @@ public class TablespaceManager implements StorageService { private JSONObject loadFromConfig(String fileName) { String json; try { - json = FileUtil.readTextFileFromResource(fileName); + json = JavaResourceUtil.readTextFromResource(fileName); } catch (FileNotFoundException fnfe) { - return null; + return null; } catch (IOException e) { throw new RuntimeException(e); } @@ -174,7 +183,7 @@ public class TablespaceManager implements StorageService { String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER); return new Pair<String, Class<? extends Tablespace>>( - storageType,(Class<? extends Tablespace>) Class.forName(handlerClass)); + storageType, (Class<? extends Tablespace>) Class.forName(handlerClass)); } private void loadTableSpaces(JSONObject json, boolean override) { @@ -182,24 +191,29 @@ public class TablespaceManager implements StorageService { if (spaces != null) { for (Map.Entry<String, Object> entry : spaces.entrySet()) { - AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override); + JSONObject spaceDetail = (JSONObject) entry.getValue(); + AddTableSpace( + entry.getKey(), + URI.create(spaceDetail.getAsString("uri")), + Boolean.parseBoolean(spaceDetail.getAsString("default")), + (JSONObject) spaceDetail.get(TABLESPACE_SPEC_CONFIGS_KEY), + override); } } } - public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) { - boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default")); - URI spaceUri = URI.create(spaceDesc.getAsString("uri")); + public static void AddTableSpace(String spaceName, URI uri, boolean isDefault, JSONObject configs, boolean override) { - if (defaultSpace) { - registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override); + + if (isDefault) { + registerTableSpace(DEFAULT_TABLESPACE_NAME, uri, configs, true, override); } - registerTableSpace(spaceName, spaceUri, spaceDesc, true, override); + registerTableSpace(spaceName, uri, configs, true, override); } private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, boolean visible, boolean override) { - Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible); + Tablespace tableSpace = initializeTableSpace(spaceName, uri, spaceDesc); tableSpace.setVisible(visible); try { @@ -261,12 +275,11 @@ public class TablespaceManager implements StorageService { public static final String KEY_SPACES = "spaces"; - private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) { - Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri); - Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme()); + private static Tablespace initializeTableSpace(String spaceName, URI uri, JSONObject spaceDesc) { + Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(UriUtil.getScheme(uri)); if (clazz == null) { - throw new RuntimeException("There is no tablespace for " + uri.toString()); + throw new RuntimeException("Not found Tablespace handler for " + uri.toString()); } try { @@ -279,7 +292,7 @@ public class TablespaceManager implements StorageService { CONSTRUCTORS.put(clazz, constructor); } - return constructor.newInstance(new Object[]{spaceName, uri}); + return constructor.newInstance(new Object[]{spaceName, uri, spaceDesc}); } catch (Exception e) { throw new RuntimeException(e); } @@ -289,12 +302,18 @@ public class TablespaceManager implements StorageService { public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) { Tablespace existing; synchronized (SPACES_URIS_MAP) { + + String scheme = UriUtil.getScheme(space.getUri()); + if (!TABLE_SPACE_HANDLERS.containsKey(scheme)) { + TABLE_SPACE_HANDLERS.put(scheme, space.getClass()); + } + // Remove existing one SPACES_URIS_MAP.remove(space.getName()); existing = TABLE_SPACES.remove(space.getUri()); // Add anotherone for test - registerTableSpace(space.name, space.uri, null, true, true); + registerTableSpace(space.name, space.uri, space.getConfig(), true, true); } // if there is an existing one, return it. return Optional.fromNullable(existing); @@ -321,7 +340,7 @@ public class TablespaceManager implements StorageService { // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific. - for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) { + for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) { if (uri.startsWith(entry.getKey().toString())) { lastOne = entry.getValue(); } @@ -383,7 +402,28 @@ public class TablespaceManager implements StorageService { return space.getTableUri(databaseName, tableName); } + @Override + public long getTableVolumn(URI tableUri) throws UnsupportedException { + return get(tableUri).get().getTableVolume(tableUri); + } + public static Iterable<Tablespace> getAllTablespaces() { return TABLE_SPACES.values(); } + + public static Collection<MetadataProvider> getMetadataProviders() { + Collection<Tablespace> filteredSpace = Collections2.filter(TABLE_SPACES.values(), new Predicate<Tablespace>() { + @Override + public boolean apply(@Nullable Tablespace space) { + return space.getProperty().isMetadataProvided(); + } + }); + + return Collections2.transform(filteredSpace, new Function<Tablespace, MetadataProvider>() { + @Override + public MetadataProvider apply(@Nullable Tablespace space) { + return space.getMetadataProvider(); + } + }); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java index 07720c7..bd46551 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; import org.apache.tajo.annotation.ThreadSafe; +import org.apache.tajo.exception.TajoInternalError; import java.io.IOException; import java.lang.reflect.Constructor; @@ -72,8 +73,8 @@ public class FragmentConvertor { CONSTRUCTOR_CACHE.put(clazz, constructor); } result = constructor.newInstance(new Object[]{fragment.getContents()}); - } catch (Exception e) { - throw new RuntimeException(e); + } catch (Throwable e) { + throw new TajoInternalError(e); } return result; http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json index 40e17f4..3ede2d4 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json @@ -12,9 +12,13 @@ "handler": "org.apache.tajo.storage.FileTablespace", "default-format": "text" }, - "hbase": { + "hbase:zk": { "handler": "org.apache.tajo.storage.hbase.HBaseTablespace", "default-format": "hbase" + }, + "jdbc:postgresql": { + "handler": "org.apache.tajo.storage.pgsql.PgSQLTablespace", + "default-format": "rowstore" } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index 6b10b0b..7f4661b 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -87,6 +87,10 @@ <name>tajo.storage.fragment.hbase.class</name> <value>org.apache.tajo.storage.hbase.HBaseFragment</value> </property> + <property> + <name>tajo.storage.fragment.jdbc.class</name> + <value>org.apache.tajo.storage.jdbc.JdbcFragment</value> + </property> <!--- Scanner Handler --> <property> @@ -143,7 +147,7 @@ <name>tajo.storage.scanner-handler.hbase.class</name> <value>org.apache.tajo.storage.hbase.HBaseScanner</value> </property> - + <!--- Appender Handler --> <property> <name>tajo.storage.appender-handler</name> @@ -231,4 +235,4 @@ <value>131072</value> <description>128KB write buffer</description> </property> -</configuration> \ No newline at end of file +</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index 139133b..934dd01 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -86,6 +86,10 @@ <name>tajo.storage.fragment.hbase.class</name> <value>org.apache.tajo.storage.hbase.HBaseFragment</value> </property> + <property> + <name>tajo.storage.fragment.jdbc.class</name> + <value>org.apache.tajo.storage.jdbc.JdbcFragment</value> + </property> <!--- Scanner Handler --> <property> http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java index 7fc6d2a..90f7aa0 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -38,6 +38,7 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.exception.*; import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.BytesUtils; @@ -415,6 +416,11 @@ public class HBaseScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public boolean isProjectable() { return true; } @@ -438,6 +444,10 @@ public class HBaseScanner implements Scanner { } @Override + public void setLimit(long num) { + } + + @Override public boolean isSplittable() { return true; } http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 2204923..0064be4 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -18,7 +18,9 @@ package org.apache.tajo.storage.hbase; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import net.minidev.json.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,10 +44,7 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.exception.DataTypeMismatchException; -import org.apache.tajo.exception.InvalidTablePropertyException; -import org.apache.tajo.exception.MissingTablePropertyException; -import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.CreateTableNode; @@ -58,6 +57,7 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.*; +import javax.annotation.Nullable; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -70,7 +70,8 @@ import java.util.*; public class HBaseTablespace extends Tablespace { private static final Log LOG = LogFactory.getLog(HBaseTablespace.class); - public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty("hbase", false, true, false); + public static final StorageProperty HBASE_STORAGE_PROPERTIES = + new StorageProperty("hbase", false, true, false, false); public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true); public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false); @@ -80,8 +81,8 @@ public class HBaseTablespace extends Tablespace { private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>(); - public HBaseTablespace(String spaceName, URI uri) { - super(spaceName, uri); + public HBaseTablespace(String spaceName, URI uri, JSONObject config) { + super(spaceName, uri, config); } @Override @@ -93,21 +94,13 @@ public class HBaseTablespace extends Tablespace { hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, splits[1]); } - @Override - public void setConfig(String name, String value) { - } - - @Override - public void setConfigs(Map<String, String> configs) { - } - public Configuration getHbaseConf() { return hbaseConf; } @Override - public long getTableVolume(URI uri) throws IOException { - return 0; + public long getTableVolume(URI uri) throws UnsupportedException { + throw new UnsupportedException(); } @Override @@ -424,12 +417,14 @@ public class HBaseTablespace extends Tablespace { } @Override - public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) + public List<Fragment> getSplits(String inputSourceId, + TableDesc tableDesc, + @Nullable EvalNode filterCondition) throws IOException, TajoException { ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions()); - List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode); + List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition); HTable htable = null; HBaseAdmin hAdmin = null; @@ -445,7 +440,7 @@ public class HBaseTablespace extends Tablespace { List<Fragment> fragments = new ArrayList<Fragment>(1); Fragment fragment = new HBaseFragment( tableDesc.getUri(), - fragmentId, htable.getName().getNameAsString(), + inputSourceId, htable.getName().getNameAsString(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname()); @@ -523,7 +518,7 @@ public class HBaseTablespace extends Tablespace { } } else { HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(), - fragmentId, + inputSourceId, htable.getName().getNameAsString(), fragmentStart, fragmentStop, @@ -713,14 +708,15 @@ public class HBaseTablespace extends Tablespace { } public List<IndexPredication> getIndexPredications(ColumnMapping columnMapping, - TableDesc tableDesc, ScanNode scanNode) + TableDesc tableDesc, + @Nullable EvalNode filterCondition) throws IOException, MissingTablePropertyException, InvalidTablePropertyException { List<IndexPredication> indexPredications = new ArrayList<IndexPredication>(); Column[] indexableColumns = getIndexableColumns(tableDesc); if (indexableColumns != null && indexableColumns.length == 1) { // Currently supports only single index column. - List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(scanNode, indexableColumns); + List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(filterCondition, indexableColumns); for (Set<EvalNode> eachEvalSet: indexablePredicateList) { Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet); if (indexPredicationValues != null) { @@ -737,12 +733,13 @@ public class HBaseTablespace extends Tablespace { return indexPredications; } - public List<Set<EvalNode>> findIndexablePredicateSet(ScanNode scanNode, Column[] indexableColumns) throws IOException { + public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual, + Column[] indexableColumns) throws IOException { List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>(); // if a query statement has a search condition, try to find indexable predicates - if (indexableColumns != null && scanNode.getQual() != null) { - EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(scanNode.getQual()); + if (indexableColumns != null && qual != null) { + EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(qual); // add qualifier to schema for qual for (Column column : indexableColumns) { http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java index f0c8f15..56ca9be 100644 --- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java @@ -43,7 +43,7 @@ public class TestHBaseTableSpace { @BeforeClass public static void setUp() throws IOException { String tableSpaceUri = "hbase:zk://host1:2171"; - HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri)); + HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri), null); hBaseTablespace.init(new TajoConf()); TablespaceManager.addTableSpaceForTest(hBaseTablespace); } @@ -74,7 +74,8 @@ public class TestHBaseTableSpace { scanNode.setQual(evalNodeA); HBaseTablespace storageManager = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); - List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + List<Set<EvalNode>> indexEvals = + storageManager.findIndexablePredicateSet(scanNode.getQual(), new Column[]{rowkeyColumn}); assertNotNull(indexEvals); assertEquals(1, indexEvals.size()); Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); @@ -85,7 +86,7 @@ public class TestHBaseTableSpace { EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075"))); EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); scanNode.setQual(evalNodeB); - indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + indexEvals = storageManager.findIndexablePredicateSet(scanNode.getQual(), new Column[]{rowkeyColumn}); assertEquals(2, indexEvals.size()); indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); assertEquals("020", indexPredicateValue.getFirst().asChars()); @@ -101,7 +102,7 @@ public class TestHBaseTableSpace { EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); scanNode.setQual(evalNodeD); - indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + indexEvals = storageManager.findIndexablePredicateSet(scanNode.getQual(), new Column[]{rowkeyColumn}); assertEquals(2, indexEvals.size()); indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); @@ -120,7 +121,7 @@ public class TestHBaseTableSpace { evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6); EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD); scanNode.setQual(evalNodeE); - indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + indexEvals = storageManager.findIndexablePredicateSet(scanNode.getQual(), new Column[]{rowkeyColumn}); assertEquals(2, indexEvals.size()); indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java index 8844fa5..33f1d04 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -29,6 +29,9 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; @@ -82,6 +85,11 @@ public abstract class FileScanner implements Scanner { } @Override + public void pushOperators(LogicalNode planPart) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override public void setTarget(Column[] targets) { if (inited) { throw new IllegalStateException("Should be called before init()"); @@ -89,6 +97,10 @@ public abstract class FileScanner implements Scanner { this.targets = targets; } + @Override + public void setLimit(long num) { + } + public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException { String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME); FileSystem fs; http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index f4c78b6..af981bb 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import net.minidev.json.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,15 +35,18 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; -import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.Bytes; import org.apache.tajo.util.TUtil; +import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; import java.text.NumberFormat; @@ -94,7 +98,7 @@ public class FileTablespace extends Tablespace { } }; - private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true); + private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true, false); private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true); protected FileSystem fs; @@ -103,8 +107,8 @@ public class FileTablespace extends Tablespace { protected boolean blocksMetadataEnabled; private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); - public FileTablespace(String spaceName, URI uri) { - super(spaceName, uri); + public FileTablespace(String spaceName, URI uri, JSONObject config) { + super(spaceName, uri, config); } @Override @@ -123,21 +127,14 @@ public class FileTablespace extends Tablespace { } @Override - public void setConfig(String name, String value) { - conf.set(name, value); - } - - @Override - public void setConfigs(Map<String, String> configs) { - for (Map.Entry<String, String> c : configs.entrySet()) { - conf.set(c.getKey(), c.getValue()); - } - } - - @Override - public long getTableVolume(URI uri) throws IOException { + public long getTableVolume(URI uri) throws UnsupportedException { Path path = new Path(uri); - ContentSummary summary = fs.getContentSummary(path); + ContentSummary summary; + try { + summary = fs.getContentSummary(path); + } catch (IOException e) { + throw new TajoInternalError(e); + } return summary.getLength(); } @@ -155,7 +152,7 @@ public class FileTablespace extends Tablespace { public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - return getScanner(meta, schema, fragment); + return getScanner(meta, schema, fragment, null); } public FileSystem getFileSystem() { @@ -627,8 +624,10 @@ public class FileTablespace extends Tablespace { } @Override - public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException { - return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getUri())); + public List<Fragment> getSplits(String inputSourceId, + TableDesc table, + @Nullable EvalNode filterCondition) throws IOException { + return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri())); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index eabab22..76c9362 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -25,6 +25,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.FieldSerializerDeserializer; import org.apache.tajo.storage.Tuple; @@ -41,12 +42,7 @@ public class CSVLineDeserializer extends TextLineDeserializer { public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { super(schema, meta); - - targetColumnIndexes = new int[projected.length]; - for (int i = 0; i < projected.length; i++) { - targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName()); - } - Arrays.sort(targetColumnIndexes); + targetColumnIndexes = PlannerUtil.getTargetIds(schema, projected); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java index 90bec65..5f8a4d1 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@ -106,7 +106,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple tuple; @@ -128,7 +128,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); assertNotNull(scanner.next()); @@ -150,7 +150,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); FileFragment fragment = getFileFragment("testErrorTolerance2.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); try { @@ -169,7 +169,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance3.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); try { @@ -185,7 +185,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "2"); FileFragment fragment = getFileFragment("testNormal.json"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); @@ -212,7 +212,7 @@ public class TestDelimitedTextFile { meta.putOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "1"); meta.putOption(StorageConstants.TEXT_DELIMITER, ","); FileFragment fragment = getFileFragment("testSkip.txt"); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java index cecd4dd..f536514 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import net.minidev.json.JSONObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -128,7 +129,7 @@ public class TestFileTablespace { } assertTrue(fs.exists(tablePath)); - FileTablespace space = new FileTablespace("testGetSplit", fs.getUri()); + FileTablespace space = new FileTablespace("testGetSplit", fs.getUri(), null); space.init(new TajoConf(conf)); assertEquals(fs.getUri(), space.getUri()); @@ -186,7 +187,7 @@ public class TestFileTablespace { } assertTrue(fs.exists(tablePath)); - FileTablespace space = new FileTablespace("testZeroLengthSplit", fs.getUri()); + FileTablespace space = new FileTablespace("testZeroLengthSplit", fs.getUri(), new JSONObject()); space.init(new TajoConf(conf)); assertEquals(fs.getUri(), space.getUri()); @@ -233,7 +234,7 @@ public class TestFileTablespace { } assertTrue(fs.exists(tablePath)); - FileTablespace sm = new FileTablespace("testGetSplitWithBlockStorageLocationsBatching", fs.getUri()); + FileTablespace sm = new FileTablespace("testGetSplitWithBlockStorageLocationsBatching", fs.getUri(), null); sm.init(new TajoConf(conf)); assertEquals(fs.getUri(), sm.getUri()); @@ -276,7 +277,7 @@ public class TestFileTablespace { FileTablespace space = TablespaceManager.getLocalFs(); assertEquals(localFs.getUri(), space.getFileSystem().getUri()); - FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri); + FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri, null); distTablespace.init(conf); existingTs = TablespaceManager.addTableSpaceForTest(distTablespace); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index 21fff58..b800ed2 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -38,6 +38,7 @@ import org.apache.tajo.storage.text.DelimitedLineReader; import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.junit.Test; import java.io.File; @@ -213,7 +214,7 @@ public class TestLineReader { @Test public void testByteBufLineReaderWithoutTerminating() throws IOException { - String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile(); + String path = JavaResourceUtil.getResourceURL("dataset/testLineText.txt").getFile(); File file = new File(path); String data = FileUtil.readTextFile(file); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 02472eb..26e16ea 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -44,7 +44,7 @@ import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.RCFile; import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.apache.tajo.util.KeyValueSet; import org.junit.Test; import org.junit.runner.RunWith; @@ -64,36 +64,36 @@ public class TestStorages { private static String TEST_PROJECTION_AVRO_SCHEMA = "{\n" + - " \"type\": \"record\",\n" + - " \"namespace\": \"org.apache.tajo\",\n" + - " \"name\": \"testProjection\",\n" + - " \"fields\": [\n" + - " { \"name\": \"id\", \"type\": \"int\" },\n" + - " { \"name\": \"age\", \"type\": \"long\" },\n" + - " { \"name\": \"score\", \"type\": \"float\" }\n" + - " ]\n" + - "}\n"; + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testProjection\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"int\" },\n" + + " { \"name\": \"age\", \"type\": \"long\" },\n" + + " { \"name\": \"score\", \"type\": \"float\" }\n" + + " ]\n" + + "}\n"; private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA = "{\n" + - " \"type\": \"record\",\n" + - " \"namespace\": \"org.apache.tajo\",\n" + - " \"name\": \"testNullHandlingTypes\",\n" + - " \"fields\": [\n" + - " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + - " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" + - " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" + - " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" + - " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" + - " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" + - " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col11\", \"type\": \"null\" },\n" + - " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" + - " ]\n" + - "}\n"; + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testNullHandlingTypes\",\n" + + " \"fields\": [\n" + + " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + + " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" + + " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" + + " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" + + " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col11\", \"type\": \"null\" },\n" + + " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" + + " ]\n" + + "}\n"; private static String TEST_MAX_VALUE_AVRO_SCHEMA = "{\n" + @@ -392,7 +392,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType, options); meta.setOptions(CatalogUtil.newDefaultProperty(storeType)); if (storeType.equalsIgnoreCase(BuiltinStorages.AVRO)) { - String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString(); + String path = JavaResourceUtil.getResourceURL("dataset/testVariousTypes.avsc").toString(); meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); } @@ -429,7 +429,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = sm.getScanner(meta, schema, fragment); + Scanner scanner = sm.getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -518,7 +518,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -593,7 +593,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -662,7 +662,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -731,7 +731,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); assertTrue(scanner instanceof SequenceFileScanner); @@ -805,7 +805,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); assertTrue(scanner instanceof SequenceFileScanner); @@ -850,7 +850,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -986,7 +986,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = sm.getScanner(meta, schema, fragment); + Scanner scanner = sm.getScanner(meta, schema, fragment, null); scanner.init(); Tuple retrieved; @@ -1048,7 +1048,7 @@ public class TestStorages { inSchema.addColumn("col5", Type.INT8); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment, null); Schema target = new Schema(); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java index 341cc07..960448c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java @@ -25,6 +25,7 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.JavaResourceUtil; import org.apache.tajo.util.NetUtils; import org.junit.Before; import org.junit.Test; @@ -46,7 +47,7 @@ public class TestAvroUtil { @Before public void setUp() throws Exception { - schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc"); + schemaUrl = JavaResourceUtil.getResourceURL("dataset/testVariousTypes.avsc"); assertNotNull(schemaUrl); File file = new File(schemaUrl.getPath()); http://git-wip-us.apache.org/repos/asf/tajo/blob/144a02e3/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java index 88d7536..75e59da 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java @@ -69,7 +69,7 @@ public class TestJsonSerDe { FileSystem fs = FileSystem.getLocal(conf); FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple tuple = scanner.next(); @@ -108,7 +108,7 @@ public class TestJsonSerDe { schema.addColumn("col1", TajoDataTypes.Type.TEXT); schema.addColumn("col2", TajoDataTypes.Type.TEXT); schema.addColumn("col3", TajoDataTypes.Type.TEXT); - Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); scanner.init(); Tuple tuple = scanner.next();
