some comparison function fixes and updates to support ref engine sql testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d3b2f9f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d3b2f9f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d3b2f9f6 Branch: refs/heads/master Commit: d3b2f9f61e859df36681a3d27f58da0a8426ccb0 Parents: 4a603de Author: Jacques Nadeau <[email protected]> Authored: Sat Aug 31 22:46:12 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sun Sep 1 15:16:22 2013 -0700 ---------------------------------------------------------------------- .../drill/common/expression/FieldReference.java | 2 +- .../drill/common/logical/data/Project.java | 6 +- .../codegen/templates/ComparisonFunctions.java | 233 ++++++++++++++----- .../exec/expr/fn/DrillSimpleFuncHolder.java | 8 + .../expr/fn/FunctionImplementationRegistry.java | 21 +- .../physical/impl/filter/FilterTemplate.java | 1 - .../impl/project/ProjectRecordBatch.java | 16 +- .../org/apache/drill/exec/ref/ROPConverter.java | 2 +- .../org/apache/drill/jdbc/DrillHandler.java | 117 +++++----- .../java/org/apache/drill/jdbc/DrillTable.java | 108 ++++++--- .../apache/drill/optiq/DrillImplementor.java | 44 ++-- .../org/apache/drill/optiq/DrillJoinRel.java | 4 +- .../org/apache/drill/optiq/DrillProjectRel.java | 4 +- .../drill/optiq/EnumerableDrillFullEngine.java | 5 +- .../apache/drill/optiq/EnumerableDrillRel.java | 2 +- .../drill/sql/client/full/FileSystemSchema.java | 2 +- .../drill/sql/client/ref/DrillRefImpl.java | 145 ++++++------ .../org/apache/drill/jdbc/test/JdbcAssert.java | 2 +- .../org/apache/drill/jdbc/test/JdbcTest.java | 4 +- 19 files changed, 477 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java index 2d62a41..cb3f0b4 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FieldReference.java @@ -44,7 +44,7 @@ public class FieldReference extends SchemaPath { } - public FieldReference(String value, ExpressionPosition pos) { + public FieldReference(CharSequence value, ExpressionPosition pos) { super(value, pos); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java index 1fb5eb1..109d218 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Project.java @@ -40,9 +40,9 @@ public class Project extends SingleInputOperator { for (int i = 0; i < selections.length; i++) { PathSegment segment = selections[i].getRef().getRootSegment(); CharSequence path = segment.getNameSegment().getPath(); -// if (!segment.isNamed() || !path.equals("output")) -// throw new ExpressionParsingException(String.format( -// "Outputs for projections always have to start with named path of output. First segment was named '%s' or was named [%s]", path, segment.isNamed())); + if (!segment.isNamed() || !path.equals("output")) + throw new ExpressionParsingException(String.format( + "Outputs for projections always have to start with named path of output. First segment was named '%s' or was named [%s]", path, segment.isNamed())); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/exec/java-exec/src/main/codegen/templates/ComparisonFunctions.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/templates/ComparisonFunctions.java b/sandbox/prototype/exec/java-exec/src/main/codegen/templates/ComparisonFunctions.java index a5b560d..d30783a 100644 --- a/sandbox/prototype/exec/java-exec/src/main/codegen/templates/ComparisonFunctions.java +++ b/sandbox/prototype/exec/java-exec/src/main/codegen/templates/ComparisonFunctions.java @@ -1,62 +1,70 @@ <@pp.dropOutputFile /> -<#macro compareBlock mode left right output> +<#macro compareBlock mode left right output nullCompare> outside:{ - <#if left?starts_with("Nullable")> - <#if right?starts_with("Nullable")> - <#-- Both are nullable. --> - if(left.isSet == 0){ - if(right.isSet == 0){ - ${output} = 0; - break outside; - }else{ - ${output} = 1; - } - }else if(right.isSet == 0){ - ${output} = -1; - break outside; - } - <#else> - <#-- Left is nullable but right is not. --> - if(left.isSet == 0){ - ${output} = 1; - break outside; - } - </#if> -<#elseif right?starts_with("Nullable")> - if(right.isSet == 0){ - ${output} = -1; - break outside; - } - </#if> - -<#if mode == "var"> - - for (int l = left.start, r = right.start; l < left.end && r < right.end; l++, r++) { - byte leftByte = left.buffer.getByte(l); - byte rightByte = right.buffer.getByte(r); - if (leftByte != rightByte) { - ${output} = ((leftByte & 0xFF) - (rightByte & 0xFF)) > 0 ? 1 : -1; + <#if nullCompare> + <#if left?starts_with("Nullable")> + <#if right?starts_with("Nullable")> + <#-- Both are nullable. --> + if(left.isSet == 0){ + if(right.isSet == 0){ + ${output} = 0; + break outside; + }else{ + ${output} = 1; + break outside; + } + }else if(right.isSet == 0){ + ${output} = -1; break outside; } - } - - int l = (left.end - left.start) - (right.end - right.start); - if (l > 0) { - ${output} = 1; - break outside; - } else if (l == 0) { - ${output} = 0; - break outside; - } else { + <#else> + <#-- Left is nullable but right is not. --> + if(left.isSet == 0){ + ${output} = 1; + break outside; + } + </#if> + <#elseif right?starts_with("Nullable")> + if(right.isSet == 0){ ${output} = -1; break outside; } -<#elseif mode == "fixed"> - ${output} = left.value < right.value ? -1 : ((left.value == right.value)? 0 : 1); -</#if> + </#if> + </#if> + + <#if mode == "var"> + + for (int l = left.start, r = right.start; l < left.end && r < right.end; l++, r++) { + byte leftByte = left.buffer.getByte(l); + byte rightByte = right.buffer.getByte(r); + if (leftByte != rightByte) { + ${output} = ((leftByte & 0xFF) - (rightByte & 0xFF)) > 0 ? 1 : -1; + break outside; + } + } + + int l = (left.end - left.start) - (right.end - right.start); + if (l > 0) { + ${output} = 1; + break outside; + } else if (l == 0) { + ${output} = 0; + break outside; + } else { + ${output} = -1; + break outside; + } + <#elseif mode == "fixed"> + ${output} = left.value < right.value ? -1 : ((left.value == right.value)? 0 : 1); + </#if> + + + + + } </#macro> @@ -89,7 +97,7 @@ public class GCompare${left}${right}{ public void setup(RecordBatch b) {} public void eval() { - <@compareBlock mode=type.mode left=left right=right output="out.value" /> + <@compareBlock mode=type.mode left=left right=right output="out.value" nullCompare=true /> } } @@ -103,9 +111,24 @@ public class GCompare${left}${right}{ public void setup(RecordBatch b) {} public void eval() { + sout: { + <#if left?starts_with("Nullable")> + if(left.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + <#if right?starts_with("Nullable")> + if(right.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + int cmp; - <@compareBlock mode=type.mode left=left right=right output="cmp" /> + <@compareBlock mode=type.mode left=left right=right output="cmp" nullCompare=false/> out.value = cmp == -1 ? 1 : 0; + } } } @@ -119,10 +142,25 @@ public class GCompare${left}${right}{ public void setup(RecordBatch b) {} public void eval() { + sout: { + <#if left?starts_with("Nullable")> + if(left.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + <#if right?starts_with("Nullable")> + if(right.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + int cmp; - <@compareBlock mode=type.mode left=left right=right output="cmp" /> + <@compareBlock mode=type.mode left=left right=right output="cmp" nullCompare=false/> out.value = cmp < 1 ? 1 : 0; - } + } + } } @FunctionTemplate(name = "greater than", scope = FunctionTemplate.FunctionScope.SIMPLE) @@ -135,10 +173,25 @@ public class GCompare${left}${right}{ public void setup(RecordBatch b) {} public void eval() { + sout: { + <#if left?starts_with("Nullable")> + if(left.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + <#if right?starts_with("Nullable")> + if(right.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + int cmp; - <@compareBlock mode=type.mode left=left right=right output="cmp" /> + <@compareBlock mode=type.mode left=left right=right output="cmp" nullCompare=false/> out.value = cmp == 1 ? 1 : 0; - } + } + } } @FunctionTemplate(name = "greater than or equal to", scope = FunctionTemplate.FunctionScope.SIMPLE) @@ -151,13 +204,28 @@ public class GCompare${left}${right}{ public void setup(RecordBatch b) {} public void eval() { + sout: { + <#if left?starts_with("Nullable")> + if(left.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + <#if right?starts_with("Nullable")> + if(right.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + int cmp; - <@compareBlock mode=type.mode left=left right=right output="cmp" /> + <@compareBlock mode=type.mode left=left right=right output="cmp" nullCompare=false/> out.value = cmp > -1 ? 1 : 0; + } } } - @FunctionTemplate(name = "equals", scope = FunctionTemplate.FunctionScope.SIMPLE) + @FunctionTemplate(name = "equal", scope = FunctionTemplate.FunctionScope.SIMPLE) public static class Equals${left}${right} implements DrillSimpleFunc { @Param ${left}Holder left; @@ -167,9 +235,56 @@ public class GCompare${left}${right}{ public void setup(RecordBatch b) {} public void eval() { + sout: { + <#if left?starts_with("Nullable")> + if(left.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + <#if right?starts_with("Nullable")> + if(right.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + + int cmp; + <@compareBlock mode=type.mode left=left right=right output="cmp" nullCompare=false/> + out.value = cmp == 0 ? 1 : 0; + } + } + } + + @FunctionTemplate(name = "not equal", scope = FunctionTemplate.FunctionScope.SIMPLE) + public static class NotEquals${left}${right} implements DrillSimpleFunc { + + @Param ${left}Holder left; + @Param ${right}Holder right; + @Output BitHolder out; + + public void setup(RecordBatch b) {} + + public void eval() { + sout: { + <#if left?starts_with("Nullable")> + if(left.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + <#if right?starts_with("Nullable")> + if(right.isSet ==0){ + out.value = 0; + break sout; + } + </#if> + int cmp; - <@compareBlock mode=type.mode left=left right=right output="cmp" /> - out.value = cmp == 0 ? 1 : 0; + <@compareBlock mode=type.mode left=left right=right output="cmp" nullCompare=false/> + out.value = cmp == 0 ? 0 : 1; + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java index 0a8e35f..f92c82f 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java @@ -1,5 +1,6 @@ package org.apache.drill.exec.expr.fn; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -99,5 +100,12 @@ class DrillSimpleFuncHolder extends DrillFuncHolder{ if (sub != topSub) sub.assign(internalOutput.ref("isSet"),JExpr.lit(1));// Assign null if NULL_IF_NULL mode return out; } + +@Override +public String toString() { + final int maxLen = 10; + return "DrillSimpleFuncHolder [, functionName=" + functionName + ", nullHandling=" + nullHandling + "parameters=" + + (parameters != null ? Arrays.asList(parameters).subList(0, Math.min(parameters.length, maxLen)) : null) + "]"; +} } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java index 3b45446..b492610 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java @@ -43,8 +43,25 @@ public class FunctionImplementationRegistry { for(LogicalExpression e : call.args){ types.add(e.getMajorType()); } - - throw new UnsupportedOperationException(String.format("Unable to find matching function implementation for call %s with args %s", call.getDefinition().getName(), types)); + StringBuilder sb = new StringBuilder(); + sb.append("Missing function implementation: "); + sb.append("["); + sb.append(call.getDefinition().getName()); + sb.append("("); + boolean first = true; + for(MajorType mt : types){ + if(first){ + first = false; + }else{ + sb.append(", "); + } + sb.append(mt.getMinorType().name()); + sb.append("-"); + sb.append(mt.getMode().name()); + } + sb.append(")"); + sb.append("]"); + throw new UnsupportedOperationException(sb.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java index 5675475..a7e8f0c 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java @@ -73,7 +73,6 @@ public abstract class FilterTemplate implements Filterer{ for(char i =0; i < recordCount; i++){ if(doEval(i, 0)){ - logger.debug("Setting svIndex {} to {}", svIndex, (int) i); outgoingSelectionVector.setIndex(svIndex, i); svIndex++; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index b7874bd..14a4905 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -5,7 +5,9 @@ import java.util.List; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -57,6 +59,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } } + /** hack to make ref and full work together... need to figure out if this is still necessary. **/ + private FieldReference getRef(NamedExpression e){ + FieldReference ref = e.getRef(); + PathSegment seg = ref.getRootSegment(); + if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){ + return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition()); + } + return ref; + } + @Override protected void setupNewSchema() throws SchemaChangeException{ this.allocationVectors = Lists.newArrayList(); @@ -70,7 +82,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ for(int i =0; i < exprs.size(); i++){ final NamedExpression namedExpression = exprs.get(i); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector); - final MaterializedField outputField = MaterializedField.create(namedExpression.getRef(), expr.getMajorType()); + final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType()); if(collector.hasErrors()){ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } @@ -81,7 +93,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector(); Preconditions.checkNotNull(incoming); - TransferPair tp = vvIn.getTransferPair(namedExpression.getRef()); + TransferPair tp = vvIn.getTransferPair(getRef(namedExpression)); transfers.add(tp); container.add(tp.getTo()); logger.debug("Added transfer."); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java index 384af2d..fb21a7f 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java @@ -106,7 +106,7 @@ class ROPConverter { private ReferenceStorageEngine getEngine(String name){ StorageEngineConfig config = plan.getStorageEngineConfig(name); - if(config == null) throw new SetupException(String.format("Unable to find define logical plan of name [%s].", name)); + if(config == null) throw new SetupException(String.format("Unable to find define storage engine of name [%s].", name)); ReferenceStorageEngine engine = engineRegistry.getEngine(config); return engine; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java index f2265f8..6cc25e0 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java @@ -13,6 +13,7 @@ import net.hydromatic.optiq.impl.java.JavaTypeFactory; import net.hydromatic.optiq.impl.java.MapSchema; import net.hydromatic.optiq.jdbc.HandlerImpl; import net.hydromatic.optiq.jdbc.OptiqConnection; +import net.hydromatic.optiq.model.ModelHandler; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.logical.StorageEngineConfig; @@ -40,58 +41,68 @@ public class DrillHandler extends HandlerImpl { public void onConnectionInit(OptiqConnection connection) throws SQLException { super.onConnectionInit(connection); - registry = new SchemaProviderRegistry(config); final Properties p = connection.getProperties(); - Preconditions.checkArgument(bit == null); - Preconditions.checkArgument(client == null); - Preconditions.checkArgument(coordinator == null); - // final String model = p.getProperty("model"); - // if (model != null) { - // if (model != null) { - // try { - // new ModelHandler(connection, model); - // } catch (IOException e) { - // throw new SQLException(e); - // } - // } - // } - - final String zk = connection.getProperties().getProperty("zk"); - - try { - String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8); - - StorageEngines engines = config.getMapper().readValue(enginesData, StorageEngines.class); - MutableSchema rootSchema = connection.getRootSchema(); - - for (Map.Entry<String, StorageEngineConfig> entry : engines) { - SchemaProvider provider = registry.getSchemaProvider(entry.getValue()); - FileSystemSchema schema = new FileSystemSchema(client, entry.getValue(), provider, rootSchema.getTypeFactory(), - rootSchema, entry.getKey(), rootSchema.getExpression(), rootSchema.getQueryProvider()); - rootSchema.addSchema(entry.getKey(), schema); + + if (p.getProperty("ref") != null) { + final String model = p.getProperty("model"); + if (model != null) { + if (model != null) { + try { + new ModelHandler(connection, model); + } catch (IOException e) { + throw new SQLException(e); + } + } } - - rootSchema.addSchema("--FAKE--", new FakeSchema(rootSchema, rootSchema.getQueryProvider(), rootSchema.getTypeFactory(), "fake", rootSchema.getExpression())); + } else { + + registry = new SchemaProviderRegistry(config); - if (zk != null) { - coordinator = new ZKClusterCoordinator(config, zk); - coordinator.start(10000); - DrillClient cl = new DrillClient(config, coordinator); - cl.connect(); - client = cl; - } else { - - RemoteServiceSet local = RemoteServiceSet.getLocalServiceSet(); - this.coordinator = local.getCoordinator(); - bit = new Drillbit(config, local); - bit.run(); - - DrillClient cl = new DrillClient(config, coordinator); - cl.connect(); - client = cl; + Preconditions.checkArgument(bit == null); + Preconditions.checkArgument(client == null); + Preconditions.checkArgument(coordinator == null); + + final String zk = connection.getProperties().getProperty("zk"); + + try { + String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8); + + StorageEngines engines = config.getMapper().readValue(enginesData, StorageEngines.class); + MutableSchema rootSchema = connection.getRootSchema(); + + for (Map.Entry<String, StorageEngineConfig> entry : engines) { + SchemaProvider provider = registry.getSchemaProvider(entry.getValue()); + FileSystemSchema schema = new FileSystemSchema(client, entry.getValue(), provider, + rootSchema.getTypeFactory(), rootSchema, entry.getKey(), rootSchema.getExpression(), + rootSchema.getQueryProvider()); + rootSchema.addSchema(entry.getKey(), schema); + } + + rootSchema.addSchema( + "--FAKE--", + new FakeSchema(rootSchema, rootSchema.getQueryProvider(), rootSchema.getTypeFactory(), "fake", rootSchema + .getExpression())); + + if (zk != null) { + coordinator = new ZKClusterCoordinator(config, zk); + coordinator.start(10000); + DrillClient cl = new DrillClient(config, coordinator); + cl.connect(); + client = cl; + } else { + + RemoteServiceSet local = RemoteServiceSet.getLocalServiceSet(); + this.coordinator = local.getCoordinator(); + bit = new Drillbit(config, local); + bit.run(); + + DrillClient cl = new DrillClient(config, coordinator); + cl.connect(); + client = cl; + } + } catch (Exception ex) { + throw new SQLException("Failure trying to connect to Drill.", ex); } - } catch (Exception ex) { - throw new SQLException("Failure trying to connect to Drill.", ex); } // The "schema" parameter currently gives a name to the schema. In future @@ -102,24 +113,24 @@ public class DrillHandler extends HandlerImpl { } } - - public class FakeSchema extends MapSchema{ + + public class FakeSchema extends MapSchema { public FakeSchema(Schema parentSchema, QueryProvider queryProvider, JavaTypeFactory typeFactory, String name, Expression expression) { super(parentSchema, queryProvider, typeFactory, name, expression); - + } + public DrillClient getClient() { return client; } } - + public DrillClient getClient() { return client; } - @Override public void onConnectionClose(OptiqConnection connection) throws RuntimeException { super.onConnectionClose(connection); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java index 6dfea48..da20756 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java @@ -18,7 +18,10 @@ package org.apache.drill.jdbc; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.Map; import net.hydromatic.linq4j.BaseQueryable; import net.hydromatic.linq4j.Enumerator; @@ -32,10 +35,14 @@ import net.hydromatic.optiq.MutableSchema; import net.hydromatic.optiq.Schema; import net.hydromatic.optiq.Statistic; import net.hydromatic.optiq.Statistics; +import net.hydromatic.optiq.TableFactory; import net.hydromatic.optiq.TranslatableTable; import org.apache.drill.common.logical.StorageEngineConfig; import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.ref.rops.DataWriter; +import org.apache.drill.exec.ref.rse.ClasspathRSE; +import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathInputConfig; import org.apache.drill.optiq.DrillRel; import org.apache.drill.optiq.DrillScan; import org.eigenbase.rel.RelNode; @@ -56,22 +63,28 @@ public class DrillTable extends BaseQueryable<Object> implements TranslatableTab public DrillClient client; + public boolean isRefEngine(){ + return client == null; + } + /** Creates a DrillTable. */ public DrillTable(DrillClient client, Schema schema, Type elementType, Expression expression, RelDataType rowType, String name, + String storageEngineName, Object selection, StorageEngineConfig storageEngineConfig ) { super(schema.getQueryProvider(), elementType, expression); + this.client = client; this.schema = schema; this.name = name; this.rowType = rowType; this.selection = selection; this.storageEngineConfig = storageEngineConfig; - this.storageEngineName = schema.getName(); + this.storageEngineName = client == null ? storageEngineName : schema.getName(); } public String getName() { @@ -91,6 +104,7 @@ public class DrillTable extends BaseQueryable<Object> implements TranslatableTab RelDataTypeFactory typeFactory, Schema schema, String name, + String storageEngineName, StorageEngineConfig storageEngineConfig, Object selection ) { @@ -106,7 +120,7 @@ public class DrillTable extends BaseQueryable<Object> implements TranslatableTab typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.ANY))), Collections.singletonList("_MAP")); - return new DrillTable(client, schema, Object.class, call, rowType, name, selection, storageEngineConfig); + return new DrillTable(client, schema, Object.class, call, rowType, name, storageEngineName, selection, storageEngineConfig); } @@ -148,38 +162,64 @@ public class DrillTable extends BaseQueryable<Object> implements TranslatableTab -// private static final List<String> DONUTS_TABLES = Arrays.asList( -// "DONUTS"); -// -// private static final List<String> HR_TABLES = Arrays.asList( -// "EMPLOYEES", "DEPARTMENTS"); + /** Factory for custom tables in Optiq schema. */ + @SuppressWarnings("UnusedDeclaration") + public static class Factory implements TableFactory<DrillTable> { + + private static final List<String> DONUTS_TABLES = Arrays.asList( + "DONUTS"); + + private static final List<String> HR_TABLES = Arrays.asList( + "EMPLOYEES", "DEPARTMENTS"); + + private static final List<String> FOODMART_TABLES = Arrays.asList( + "ACCOUNT", "CATEGORY", "CURRENCY", "CUSTOMER", "DAYS", "DEPARTMENT", + "EMPLOYEE_CLOSURE", "EMPLOYEE", "EXPENSE_FACT", "INVENTORY_FACT_1997", + "INVENTORY_FACT_1998", "POSITION", "PRODUCT_CLASS", "PRODUCT", + "PROMOTION", "REGION", "RESERVE_EMPLOYEE", "SALARY", "SALES_FACT_1997", + "SALES_FACT_1998", "SALES_FACT_DEC_1998", "STORE", "STORE_RAGGED", + "TIME_BY_DAY", "WAREHOUSE", "WAREHOUSE_CLASS"); + +// public DrillTable create( +// JavaTypeFactory typeFactory, +// Schema schema, +// String name, +// Map<String, Object> operand, +// RelDataType rowType) { +// final ClasspathRSE.ClasspathRSEConfig rseConfig = new ClasspathRSE.ClasspathRSEConfig(); +// final ClasspathInputConfig inputConfig = new ClasspathInputConfig(); +// assert DONUTS_TABLES.contains(name) +// || HR_TABLES.contains(name) +// || FOODMART_TABLES.contains(name) +// : name; +// inputConfig.path = "/" + name.toLowerCase() + ".json"; +// inputConfig.type = DataWriter.ConverterType.JSON; +// boolean useReferenceInterpreter; +// if (operand.get("useReferenceInterpreter") != null){ +// useReferenceInterpreter = operand.get("useReferenceInterpreter").equals("true") ? true : false; +// } +// else{ +// useReferenceInterpreter = false; +// } +// return createTable(typeFactory, (MutableSchema) schema, name, rseConfig, +// inputConfig, "donuts-json", useReferenceInterpreter); +// } // -// private static final List<String> FOODMART_TABLES = Arrays.asList( -// "ACCOUNT", "CATEGORY", "CURRENCY", "CUSTOMER", "DAYS", "DEPARTMENT", -// "EMPLOYEE_CLOSURE", "EMPLOYEE", "EXPENSE_FACT", "INVENTORY_FACT_1997", -// "INVENTORY_FACT_1998", "POSITION", "PRODUCT_CLASS", "PRODUCT", -// "PROMOTION", "REGION", "RESERVE_EMPLOYEE", "SALARY", "SALES_FACT_1997", -// "SALES_FACT_1998", "SALES_FACT_DEC_1998", "STORE", "STORE_RAGGED", -// "TIME_BY_DAY", "WAREHOUSE", "WAREHOUSE_CLASS"); -// -// public static DrillTable create(Schema schema, String name, Map<String, Object> operand, RelDataType rowType) { -// -// final ClasspathRSE.ClasspathRSEConfig rseConfig = new ClasspathRSE.ClasspathRSEConfig(); -// final ClasspathInputConfig inputConfig = new ClasspathInputConfig(); -// assert DONUTS_TABLES.contains(name) -// || HR_TABLES.contains(name) -// || FOODMART_TABLES.contains(name) -// : name; -// inputConfig.path = "/" + name.toLowerCase() + ".json"; -// inputConfig.type = DataWriter.ConverterType.JSON; -// boolean useReferenceInterpreter; -// if (operand.get("useReferenceInterpreter") != null){ -// useReferenceInterpreter = operand.get("useReferenceInterpreter").equals("true") ? true : false; -// }else{ -// useReferenceInterpreter = false; -// } -// return createTable(schema.getTypeFactory(), (MutableSchema) schema, name, rseConfig, -// inputConfig, "donuts-json", useReferenceInterpreter); -// } + @Override + public DrillTable create(Schema schema, String name, Map<String, Object> operand, RelDataType rowType) { + + final ClasspathRSE.ClasspathRSEConfig rseConfig = new ClasspathRSE.ClasspathRSEConfig(); + final ClasspathInputConfig inputConfig = new ClasspathInputConfig(); + assert DONUTS_TABLES.contains(name) + || HR_TABLES.contains(name) + || FOODMART_TABLES.contains(name) + : name; + inputConfig.path = "/" + name.toLowerCase() + ".json"; + inputConfig.type = DataWriter.ConverterType.JSON; + return createTable(null, schema.getTypeFactory(), (MutableSchema) schema, name, "donuts-json", rseConfig, inputConfig); + } + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java index 0d3ed68..27b9630 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java @@ -17,13 +17,16 @@ ******************************************************************************/ package org.apache.drill.optiq; +import java.io.IOException; import java.util.Set; import org.apache.drill.exec.ref.rse.QueueRSE.QueueOutputInfo; import org.apache.drill.jdbc.DrillTable; import org.eigenbase.rel.RelNode; +import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -48,8 +51,10 @@ public class DrillImplementor { private final ArrayNode operatorsNode; private final ObjectNode sourcesNode; private Set<DrillTable> tables = Sets.newHashSet(); + private final boolean isRef; - public DrillImplementor() { + public DrillImplementor(boolean isRef) { + this.isRef = isRef; final ObjectNode headNode = mapper.createObjectNode(); rootNode.put("head", headNode); headNode.put("type", "APACHE_DRILL_LOGICAL"); @@ -65,17 +70,19 @@ public class DrillImplementor { sourcesNode = mapper.createObjectNode(); rootNode.put("storage", sourcesNode); - // input file source -// { -// final ObjectNode sourceNode = mapper.createObjectNode(); -// sourceNode.put("type", "classpath"); -// sourcesNode.put("donuts-json", sourceNode); -// } -// { -// final ObjectNode sourceNode = mapper.createObjectNode(); -// sourceNode.put("type", "queue"); -// sourcesNode.put("queue", sourceNode); -// } + if(isRef){ + { + final ObjectNode sourceNode = mapper.createObjectNode(); + sourceNode.put("type", "classpath"); + sourcesNode.put("donuts-json", sourceNode); + } + { + final ObjectNode sourceNode = mapper.createObjectNode(); + sourceNode.put("type", "queue"); + sourcesNode.put("queue", sourceNode); + } + } + final ArrayNode queryNode = mapper.createArrayNode(); rootNode.put("query", queryNode); @@ -114,7 +121,18 @@ public class DrillImplementor { /** Returns the generated plan. */ public String getJsonString() { String s = rootNode.toString(); - if(logger.isDebugEnabled()) logger.debug("Optiq Generated Logical Plan: {}", s); + + if(logger.isDebugEnabled()){ + JsonNode node; + try { + ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); + node = mapper.readValue(s, JsonNode.class); + logger.debug("Optiq Generated Logical Plan: {}", mapper.writeValueAsString(node)); + } catch (IOException e) { + logger.error("Failure while trying to parse logical plan string of {}", s, e); + } + + } return s; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillJoinRel.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillJoinRel.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillJoinRel.java index 82a0c43..f367e6d 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillJoinRel.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillJoinRel.java @@ -123,8 +123,8 @@ public class DrillJoinRel extends JoinRelBase implements DrillRel { final ObjectNode objectNode = implementor.mapper.createObjectNode(); transforms.add(objectNode); objectNode.put("expr", pair.left); -// objectNode.put("ref", "output." + pair.right); - objectNode.put("ref", pair.right); + objectNode.put("ref", "output." + pair.right); +// objectNode.put("ref", pair.right); } return implementor.add(project); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillProjectRel.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillProjectRel.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillProjectRel.java index 0470f5e..1c0b00b 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillProjectRel.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/DrillProjectRel.java @@ -74,8 +74,8 @@ public class DrillProjectRel extends ProjectRelBase implements DrillRel { transforms.add(objectNode); String expr = DrillOptiq.toDrill(getChild(), pair.left); objectNode.put("expr", expr); -// String ref = "output." + pair.right; - String ref = pair.right; + String ref = "output." + pair.right; +// String ref = pair.right; objectNode.put("ref", ref); } return implementor.add(project); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java index c6cb707..a71ab8c 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java @@ -69,13 +69,12 @@ public class EnumerableDrillFullEngine<E> extends AbstractEnumerable<E> implemen public static <E> EnumerableDrillFullEngine<E> of(String plan, final List<String> fieldNames, Class<E> clazz, DataContext context) { DrillConfig config = DrillConfig.create(); - return new EnumerableDrillFullEngine<>(config, plan, clazz, fieldNames, ((FakeSchema) context.getSubSchema("--FAKE--")).getClient()); + FakeSchema s = (FakeSchema) context.getSubSchema("--FAKE--"); + return new EnumerableDrillFullEngine<>(config, plan, clazz, fieldNames, s == null ? null : s.getClient()); } @Override public Enumerator<E> enumerator() { - - //DrillTable table = (DrillTable) drillConnectionDataContext.getSubSchema("DONUTS").getTable("DONUTS", Object.class); if (client == null) { DrillRefImpl<E> impl = new DrillRefImpl<E>(plan, config, fields, queue); return impl.enumerator(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java index b280f8a..2f9cd9b 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java @@ -102,7 +102,7 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel { public Result implement(EnumerableRelImplementor implementor, Prefer pref) { logger.debug("implementing enumerable"); - final DrillImplementor drillImplementor = new DrillImplementor(); + final DrillImplementor drillImplementor = new DrillImplementor(this.client == null); DrillRel input = (DrillRel) getChild(); drillImplementor.go(input); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/FileSystemSchema.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/FileSystemSchema.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/FileSystemSchema.java index 85a4125..f506ebd 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/FileSystemSchema.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/FileSystemSchema.java @@ -103,7 +103,7 @@ public class FileSystemSchema implements Schema{ Object selection = schemaProvider.getSelectionBaseOnName(name); if(selection == null) return null; - DrillTable table = DrillTable.createTable(client, typeFactory, this, name, config, selection); + DrillTable table = DrillTable.createTable(client, typeFactory, this, name, null, config, selection); info = new TableInfo(name, table); TableInfo oldInfo = (TableInfo) tables.putIfAbsent(name, info); if(oldInfo != null) return (Table<E>) oldInfo.table; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java index 3792ddf..a4256a1 100644 --- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java +++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java @@ -9,7 +9,6 @@ import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -28,24 +27,23 @@ import org.apache.drill.exec.ref.ReferenceInterpreter; import org.apache.drill.exec.ref.RunOutcome; import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory; import org.apache.drill.exec.ref.rse.RSERegistry; -import org.apache.drill.jdbc.DrillTable; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; public class DrillRefImpl<E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRefImpl.class); private static final ObjectMapper mapper = createMapper(); - + private final String plan; final BlockingQueue<Object> queue; final DrillConfig config; private final List<String> fields; - public DrillRefImpl(String plan, DrillConfig config, List<String> fields, BlockingQueue<Object> queue) { super(); this.plan = plan; @@ -53,8 +51,7 @@ public class DrillRefImpl<E> { this.fields = fields; this.queue = queue; } - - + private static ObjectMapper createMapper() { return new ObjectMapper(); } @@ -67,17 +64,20 @@ public class DrillRefImpl<E> { private final String holder; private final List<String> fields; private Object current; + private Future<Collection<RunOutcome>> futureOutcomes; - public JsonEnumerator(BlockingQueue<Object> queue, List<String> fields) { + public JsonEnumerator(Future<Collection<RunOutcome>> futureOutcomes, BlockingQueue<Object> queue, + List<String> fields) { + this.futureOutcomes = futureOutcomes; this.queue = queue; this.holder = null; this.fields = fields; } - public void close(){ - + public void close() { + } - + public Object current() { return current; } @@ -87,13 +87,32 @@ public class DrillRefImpl<E> { Object o = queue.take(); if (o instanceof RunOutcome.OutcomeType) { switch ((RunOutcome.OutcomeType) o) { - case SUCCESS: - return false; // end of data - case CANCELED: - throw new RuntimeException("canceled"); - case FAILED: - default: - throw new RuntimeException("failed"); + case SUCCESS: + return false; // end of data + case CANCELED: + throw new RuntimeException("canceled"); + case FAILED: + default: + try { + Collection<RunOutcome> outcomes = this.futureOutcomes.get(); + List<RunOutcome> l = Lists.newArrayList(outcomes); + for (int i = 1; i < outcomes.size(); i++) { + RunOutcome out = l.get(i); + logger.error("Failure while running query", out, out.exception); + } + if (!outcomes.isEmpty()) { + RunOutcome out = outcomes.iterator().next(); + if (out.exception != null) { + throw new RuntimeException("Query Failed while running.", out.exception); + } else { + throw new RuntimeException("Query Failed while running. " + o); + } + } + } catch (Exception e) { + throw new RuntimeException("failed", e); + } + + throw new RuntimeException("failed"); } } else { Object o1 = parseJson((byte[]) o); @@ -127,45 +146,38 @@ public class DrillRefImpl<E> { } } - /** * Runs the plan as a background task. */ - Future<Collection<RunOutcome>> runRefInterpreterPlan( - CompletionService<Collection<RunOutcome>> service) { + Future<Collection<RunOutcome>> runRefInterpreterPlan(CompletionService<Collection<RunOutcome>> service) { LogicalPlan parsedPlan = LogicalPlan.parse(DrillConfig.create(), plan); IteratorRegistry ir = new IteratorRegistry(); DrillConfig config = DrillConfig.create(); config.setSinkQueues(0, queue); - final ReferenceInterpreter i = - new ReferenceInterpreter(parsedPlan, ir, new BasicEvaluatorFactory(ir), - new RSERegistry(config)); + final ReferenceInterpreter i = new ReferenceInterpreter(parsedPlan, ir, new BasicEvaluatorFactory(ir), + new RSERegistry(config)); try { i.setup(); } catch (IOException e) { throw new RuntimeException(e); } - return service.submit( - new Callable<Collection<RunOutcome>>() { - @Override - public Collection<RunOutcome> call() throws Exception { - Collection<RunOutcome> outcomes = i.run(); - - for (RunOutcome outcome : outcomes) { - System.out.println("============"); - System.out.println(outcome); - if (outcome.outcome == RunOutcome.OutcomeType.FAILED - && outcome.exception != null) { - outcome.exception.printStackTrace(); - } - } - return outcomes; + return service.submit(new Callable<Collection<RunOutcome>>() { + @Override + public Collection<RunOutcome> call() throws Exception { + Collection<RunOutcome> outcomes = i.run(); + + for (RunOutcome outcome : outcomes) { + System.out.println("============"); + System.out.println(outcome); + if (outcome.outcome == RunOutcome.OutcomeType.FAILED && outcome.exception != null) { + outcome.exception.printStackTrace(); } - }); + } + return outcomes; + } + }); } - - public Enumerator<E> enumerator() { // TODO: use a completion service from the container final ExecutorCompletionService<Collection<RunOutcome>> service = new ExecutorCompletionService<Collection<RunOutcome>>( @@ -177,13 +189,13 @@ public class DrillRefImpl<E> { // TODO: use the result of task, and check for exceptions final Future<Collection<RunOutcome>> task = runRefInterpreterPlan(service); - return new JsonEnumerator(queue, fields); + return new JsonEnumerator(task, queue, fields); } - + /** - * Converts a JSON document, represented as an array of bytes, into a Java - * object (consisting of Map, List, String, Integer, Double, Boolean). + * Converts a JSON document, represented as an array of bytes, into a Java object (consisting of Map, List, String, + * Integer, Double, Boolean). */ static Object parseJson(byte[] bytes) { try { @@ -192,33 +204,31 @@ public class DrillRefImpl<E> { throw new RuntimeException(e); } } - - /** - * Converts a JSON node to Java objects ({@link List}, {@link Map}, - * {@link String}, {@link Integer}, {@link Double}, {@link Boolean}. + * Converts a JSON node to Java objects ({@link List}, {@link Map}, {@link String}, {@link Integer}, {@link Double}, + * {@link Boolean}. */ static Object wrapper(JsonNode node) { switch (node.asToken()) { - case START_OBJECT: - return map((ObjectNode) node); - case START_ARRAY: - return array((ArrayNode) node); - case VALUE_STRING: - return node.asText(); - case VALUE_NUMBER_INT: - return node.asInt(); - case VALUE_NUMBER_FLOAT: - return node.asDouble(); - case VALUE_TRUE: - return Boolean.TRUE; - case VALUE_FALSE: - return Boolean.FALSE; - case VALUE_NULL: - return null; - default: - throw new AssertionError("unexpected: " + node + ": " + node.asToken()); + case START_OBJECT: + return map((ObjectNode) node); + case START_ARRAY: + return array((ArrayNode) node); + case VALUE_STRING: + return node.asText(); + case VALUE_NUMBER_INT: + return node.asInt(); + case VALUE_NUMBER_FLOAT: + return node.asDouble(); + case VALUE_TRUE: + return Boolean.TRUE; + case VALUE_FALSE: + return Boolean.FALSE; + case VALUE_NULL: + return null; + default: + throw new AssertionError("unexpected: " + node + ": " + node.asToken()); } } @@ -241,5 +251,4 @@ public class DrillRefImpl<E> { return Collections.unmodifiableSortedMap(map); } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java index 52dbcfb..d3927f4 100644 --- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java +++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java @@ -113,7 +113,7 @@ public class JdbcAssert { this.connectionFactory = new ConnectionFactory() { public Connection createConnection() throws Exception { Class.forName("org.apache.drill.jdbc.Driver"); - return DriverManager.getConnection("jdbc:drill:", ModelAndSchema.this.info); + return DriverManager.getConnection("jdbc:drill:ref=true", ModelAndSchema.this.info); } }; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3b2f9f6/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java index ccab8fd..d9dd11a 100644 --- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java +++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java @@ -32,7 +32,7 @@ import com.google.common.base.Function; import com.google.common.io.Resources; /** Unit tests for Drill's JDBC driver. */ -@Ignore + public class JdbcTest { private static String MODEL; private static String EXPECTED; @@ -65,7 +65,7 @@ public class JdbcTest { @Test public void testConnect() throws Exception { Class.forName("org.apache.drill.jdbc.Driver"); - final Connection connection = DriverManager.getConnection("jdbc:drill:schema=DONUTS"); + final Connection connection = DriverManager.getConnection("jdbc:drill:ref=true&schema=DONUTS"); connection.close(); }
