Repository: incubator-gearpump Updated Branches: refs/heads/master f75fb19c7 -> 995c8cc0c
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java new file mode 100644 index 0000000..ca525d6 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import java.util.List; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; +import org.apache.gearpump.sql.rel.GearIOSinkRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearIOSinkRule extends ConverterRule { + + public static final GearIOSinkRule INSTANCE = new GearIOSinkRule(); + + private GearIOSinkRule() { + super(LogicalTableModify.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearIOSinkRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableModify tableModify = (TableModify) rel; + final RelNode input = tableModify.getInput(); + + final RelOptCluster cluster = tableModify.getCluster(); + final RelTraitSet traitSet = tableModify.getTraitSet().replace(GearLogicalConvention.INSTANCE); + final RelOptTable relOptTable = tableModify.getTable(); + final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); + final RelNode convertedInput = convert(input, + input.getTraitSet().replace(GearLogicalConvention.INSTANCE)); + final TableModify.Operation operation = tableModify.getOperation(); + final List<String> updateColumnList = tableModify.getUpdateColumnList(); + final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); + final boolean flattened = tableModify.isFlattened(); + + final Table table = tableModify.getTable().unwrap(Table.class); + + switch (table.getJdbcTableType()) { + case TABLE: + case STREAM: + if (operation != TableModify.Operation.INSERT) { + throw new UnsupportedOperationException( + String.format("Streams doesn't support %s modify operation", operation)); + } + return new GearIOSinkRel(cluster, traitSet, + relOptTable, catalogReader, convertedInput, operation, updateColumnList, + sourceExpressionList, flattened); + default: + throw new IllegalArgumentException( + String.format("Unsupported table type: %s", table.getJdbcTableType())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java new file mode 100644 index 0000000..a725b1a --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.gearpump.sql.rel.GearIOSourceRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearIOSourceRule extends ConverterRule { + + public static final GearIOSourceRule INSTANCE = new GearIOSourceRule(); + + private GearIOSourceRule() { + super(LogicalTableScan.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearIOSourceRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableScan scan = (TableScan) rel; + + return new GearIOSourceRel(scan.getCluster(), + scan.getTraitSet().replace(GearLogicalConvention.INSTANCE), scan.getTable()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java new file mode 100644 index 0000000..eb149f3 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.logical.LogicalIntersect; +import org.apache.gearpump.sql.rel.GearIntersectRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +import java.util.List; + +public class GearIntersectRule extends ConverterRule { + + public static final GearIntersectRule INSTANCE = new GearIntersectRule(); + + private GearIntersectRule() { + super(LogicalIntersect.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearIntersectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Intersect intersect = (Intersect) rel; + final List<RelNode> inputs = intersect.getInputs(); + return new GearIntersectRel( + intersect.getCluster(), + intersect.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convertList(inputs, GearLogicalConvention.INSTANCE), + intersect.all + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java new file mode 100644 index 0000000..e86db06 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.gearpump.sql.rel.GearJoinRel; +import org.apache.gearpump.sql.rel.GearLogicalConvention; + +public class GearJoinRule extends ConverterRule { + + public static final GearJoinRule INSTANCE = new GearJoinRule(); + + private GearJoinRule() { + super(LogicalJoin.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearJoinRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new GearJoinRel( + join.getCluster(), + join.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(join.getLeft(), + join.getLeft().getTraitSet().replace(GearLogicalConvention.INSTANCE)), + convert(join.getRight(), + join.getRight().getTraitSet().replace(GearLogicalConvention.INSTANCE)), + join.getCondition(), + join.getVariablesSet(), + join.getJoinType() + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java new file mode 100644 index 0000000..103a29d --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.logical.LogicalMinus; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearMinusRel; + +import java.util.List; + +public class GearMinusRule extends ConverterRule { + + public static final GearMinusRule INSTANCE = new GearMinusRule(); + + private GearMinusRule() { + super(LogicalMinus.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearMinusRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Minus minus = (Minus) rel; + final List<RelNode> inputs = minus.getInputs(); + return new GearMinusRel( + minus.getCluster(), + minus.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convertList(inputs, GearLogicalConvention.INSTANCE), + minus.all + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java new file mode 100644 index 0000000..6b09550 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearProjectRel; + +public class GearProjectRule extends ConverterRule { + + public static final GearProjectRule INSTANCE = new GearProjectRule(); + + private GearProjectRule() { + super(LogicalProject.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearProjectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Project project = (Project) rel; + final RelNode input = project.getInput(); + + return new GearProjectRel(project.getCluster(), + project.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)), + project.getProjects(), project.getRowType()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java new file mode 100644 index 0000000..0a0d9e4 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearSortRel; + +public class GearSortRule extends ConverterRule { + + public static final GearSortRule INSTANCE = new GearSortRule(); + + private GearSortRule() { + super(LogicalSort.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearSortRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Sort sort = (Sort) rel; + final RelNode input = sort.getInput(); + return new GearSortRel( + sort.getCluster(), + sort.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)), + sort.getCollation(), + sort.offset, + sort.fetch + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java new file mode 100644 index 0000000..7a17a46 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearUnionRel; + +public class GearUnionRule extends ConverterRule { + + public static final GearUnionRule INSTANCE = new GearUnionRule(); + + private GearUnionRule() { + super(LogicalUnion.class, Convention.NONE, GearLogicalConvention.INSTANCE, + "GearUnionRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Union union = (Union) rel; + + return new GearUnionRel( + union.getCluster(), + union.getTraitSet().replace(GearLogicalConvention.INSTANCE), + convertList(union.getInputs(), GearLogicalConvention.INSTANCE), + union.all + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java new file mode 100644 index 0000000..b04eec2 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.rule; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rel.GearValuesRel; + +public class GearValuesRule extends ConverterRule { + + public static final GearValuesRule INSTANCE = new GearValuesRule(); + + private GearValuesRule() { + super(LogicalValues.class, Convention.NONE, + GearLogicalConvention.INSTANCE, "GearValuesRule"); + } + + @Override + public RelNode convert(RelNode rel) { + Values values = (Values) rel; + return new GearValuesRel( + values.getCluster(), + values.getRowType(), + values.getTuples(), + values.getTraitSet().replace(GearLogicalConvention.INSTANCE) + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java new file mode 100644 index 0000000..7ecba21 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.table; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; + +public class SampleString { + + public static JavaStream<String> WORDS; + + public static class Stream { + public static final Message[] KV = {new Message("001", "This is a good start, bingo!! bingo!!")}; + + public static String getKV() { + return KV[0].WORD; + } + } + + public static class Message { + public final String ID; + public final String WORD; + + public Message(String ID, String WORD) { + this.ID = ID; + this.WORD = WORD; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java new file mode 100644 index 0000000..4aa20e0 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.table; + +public class SampleTransactions { + + public static class Transactions { + + public final Order[] ORDERS = { + new Order("001", 3), + new Order("002", 5), + new Order("003", 8), + new Order("004", 15), + }; + + public final Product[] PRODUCTS = { + new Product("001", "Book"), + new Product("002", "Pen"), + new Product("003", "Pencil"), + new Product("004", "Ruler"), + }; + } + + public static class Order { + public final String ID; + public final int QUANTITY; + + public Order(String ID, int QUANTITY) { + this.ID = ID; + this.QUANTITY = QUANTITY; + } + } + + public static class Product { + public final String ID; + public final String NAME; + + public Product(String ID, String NAME) { + this.ID = ID; + this.NAME = NAME; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java new file mode 100644 index 0000000..4ff9efd --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.table; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.*; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.util.Map; + +public class TransactionsTableFactory implements TableFactory<Table> { + + @Override + public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { + final Object[][] rows = { + {100, "I001", "item1", 3}, + {101, "I002", "item2", 5}, + {102, "I003", "item3", 8}, + {103, "I004", "item4", 33}, + {104, "I005", "item5", 23} + }; + + return new TransactionsTable(ImmutableList.copyOf(rows)); + } + + public static class TransactionsTable implements ScannableTable { + + protected final RelProtoDataType protoRowType = new RelProtoDataType() { + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("timeStamp", SqlTypeName.TIMESTAMP) + .add("id", SqlTypeName.VARCHAR, 10) + .add("item", SqlTypeName.VARCHAR, 50) + .add("quantity", SqlTypeName.INTEGER) + .build(); + } + }; + + private final ImmutableList<Object[]> rows; + + public TransactionsTable(ImmutableList<Object[]> rows) { + this.rows = rows; + } + + public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java new file mode 100644 index 0000000..a63036d --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.utils; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RuleSets; + +import java.util.ArrayList; +import java.util.List; + +public class CalciteFrameworkConfiguration { + + public static FrameworkConfig getDefaultconfig(SchemaPlus schema) { + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder() + .setLex(Lex.JAVA) + .build()) + .defaultSchema(schema) + .traitDefs(traitDefs) + .context(Contexts.EMPTY_CONTEXT) + .ruleSets(RuleSets.ofList()) + .costFactory(null) + .typeSystem(RelDataTypeSystem.DEFAULT) + .build(); + + return frameworkConfiguration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java new file mode 100644 index 0000000..03b2a47 --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.utils; + +import com.typesafe.config.Config; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; + +public class GearConfiguration { + + private Config akkaConf; + private ClientContext context; + public static JavaStreamApp app; + + public void defaultConfiguration() { + setAkkaConf(ClusterConfig.defaultConfig()); + setContext(ClientContext.apply(akkaConf)); + } + + public void ConfigJavaStreamApp() { + app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); + } + + public void setAkkaConf(Config akkaConf) { + this.akkaConf = akkaConf; + } + + public void setContext(ClientContext context) { + this.context = context; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java new file mode 100644 index 0000000..d3d723f --- /dev/null +++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.validator; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlValidatorImpl; + +public class CalciteSqlValidator extends SqlValidatorImpl { + public CalciteSqlValidator(SqlOperatorTable opTab, + CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory, + SqlConformance conformance) { + super(opTab, catalogReader, typeFactory, conformance); + } + + @Override + protected RelDataType getLogicalSourceRowType( + RelDataType sourceRowType, SqlInsert insert) { + final RelDataType superType = + super.getLogicalSourceRowType(sourceRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } + + @Override + protected RelDataType getLogicalTargetRowType( + RelDataType targetRowType, SqlInsert insert) { + final RelDataType superType = + super.getLogicalTargetRowType(targetRowType, insert); + return ((JavaTypeFactory) typeFactory).toSql(superType); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/experiments/sql/src/main/resources/log4j.properties b/experiments/sql/src/main/resources/log4j.properties new file mode 100644 index 0000000..98dcb80 --- /dev/null +++ b/experiments/sql/src/main/resources/log4j.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# For the general syntax of property based configuration files see the +# documenation of org.apache.log4j.PropertyConfigurator. +# The root category uses the appender called A1. Since no priority is +# specified, the root category assumes the default priority for root +# which is DEBUG in log4j. The root category is the only category that +# has a default priority. All other categories need not be assigned a +# priority in which case they inherit their priority from the +# hierarchy. +#log4j.rootLogger=debug, console +log4j.rootLogger=info, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +#log4j.appender.console.layout.ConversionPattern=[%t] %-5p %c %x - %m%n +log4j.appender.console.layout.ConversionPattern=[%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java new file mode 100644 index 0000000..64c5af7 --- /dev/null +++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.example; + +import org.apache.calcite.adapter.java.ReflectiveSchema; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.*; +import org.apache.gearpump.sql.rel.GearLogicalConvention; +import org.apache.gearpump.sql.rule.GearAggregationRule; +import org.apache.gearpump.sql.rule.GearFlatMapRule; +import org.apache.gearpump.sql.table.SampleString; +import org.apache.gearpump.sql.utils.GearConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; + +public class SqlWordCountTest { + + private static final Logger LOG = LoggerFactory.getLogger(SqlWordCountTest.class); + + private Planner getPlanner(List<RelTraitDef> traitDefs, Program... programs) { + try { + return getPlanner(traitDefs, SqlParser.Config.DEFAULT, programs); + } catch (ClassNotFoundException e) { + LOG.error(e.getMessage()); + } catch (SQLException e) { + LOG.error(e.getMessage()); + } + return null; + } + + private Planner getPlanner(List<RelTraitDef> traitDefs, + SqlParser.Config parserConfig, + Program... programs) throws ClassNotFoundException, SQLException { + + Class.forName("org.apache.calcite.jdbc.Driver"); + java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:"); + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + rootSchema.add("STR", new ReflectiveSchema(new SampleString.Stream())); + + final FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(parserConfig) + .defaultSchema(rootSchema) + .traitDefs(traitDefs) + .programs(programs) + .build(); + return Frameworks.getPlanner(config); + } + + void wordCountTest(GearConfiguration gearConfig) throws SqlParseException, + ValidationException, RelConversionException { + + RuleSet ruleSet = RuleSets.ofList( + GearFlatMapRule.INSTANCE, + GearAggregationRule.INSTANCE); + + Planner planner = getPlanner(null, Programs.of(ruleSet)); + + String sql = "SELECT COUNT(*) FROM str.kv GROUP BY str.kv.word"; + System.out.println("SQL Query:-\t" + sql + "\n"); + + SqlNode parse = planner.parse(sql); + System.out.println("SQL Parse Tree:- \n" + parse.toString() + "\n\n"); + + SqlNode validate = planner.validate(parse); + RelNode convert = planner.rel(validate).project(); + System.out.println("Relational Expression:- \n" + RelOptUtil.toString(convert) + "\n"); + + gearConfig.defaultConfiguration(); + gearConfig.ConfigJavaStreamApp(); + + RelTraitSet traitSet = convert.getTraitSet().replace(GearLogicalConvention.INSTANCE); + try { + RelNode transform = planner.transform(0, traitSet, convert); + System.out.println(RelOptUtil.toString(transform)); + } catch (Exception e) { + } + + } + + + public static void main(String[] args) throws ClassNotFoundException, + SQLException, SqlParseException { + + SqlWordCountTest gearSqlWordCount = new SqlWordCountTest(); + + try { + GearConfiguration gearConfig = new GearConfiguration(); + gearSqlWordCount.wordCountTest(gearConfig); + } catch (ValidationException e) { + LOG.error(e.getMessage()); + } catch (RelConversionException e) { + LOG.error(e.getMessage()); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java new file mode 100644 index 0000000..2f21531 --- /dev/null +++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRules; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.adapter.java.ReflectiveSchema; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.*; +import org.apache.calcite.plan.RelOptTable.ViewExpander; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.rules.LoptOptimizeJoinRule; +import org.apache.calcite.rel.rules.SortRemoveRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexExecutor; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.calcite.sql2rel.RelDecorrelator; +import org.apache.calcite.sql2rel.SqlRexConvertletTable; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.tools.*; +import org.apache.calcite.util.Util; +import org.apache.gearpump.sql.rule.GearFilterRule; +import org.apache.gearpump.sql.table.SampleTransactions; +import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration; +import org.apache.gearpump.sql.validator.CalciteSqlValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; + +public class CalciteTest { + + private static final Logger LOG = LoggerFactory.getLogger(CalciteTest.class); + + private SqlOperatorTable operatorTable; + private FrameworkConfig config; + private ImmutableList<RelTraitDef> traitDefs; + private SqlParser.Config parserConfig; + private SqlRexConvertletTable convertletTable; + private State state; + private SchemaPlus defaultSchema; + private JavaTypeFactory typeFactory; + private RelOptPlanner planner; + private RexExecutor executor; + private RelRoot root; + + public CalciteTest(FrameworkConfig config) { + this.config = config; + this.defaultSchema = config.getDefaultSchema(); + this.operatorTable = config.getOperatorTable(); + this.parserConfig = config.getParserConfig(); + this.state = State.STATE_0_CLOSED; + this.traitDefs = config.getTraitDefs(); + this.convertletTable = config.getConvertletTable(); + this.executor = config.getExecutor(); + reset(); + } + + public CalciteTest() { + } + + private void ensure(State state) { + if (state == this.state) { + return; + } + if (state.ordinal() < this.state.ordinal()) { + throw new IllegalArgumentException("cannot move to " + state + " from " + + this.state); + } + state.from(this); + } + + public void close() { + typeFactory = null; + state = State.STATE_0_CLOSED; + } + + public void reset() { + ensure(State.STATE_0_CLOSED); + state = State.STATE_1_RESET; + } + + private void ready() { + switch (state) { + case STATE_0_CLOSED: + reset(); + } + ensure(State.STATE_1_RESET); + Frameworks.withPlanner( + new Frameworks.PlannerAction<Void>() { + public Void apply(RelOptCluster cluster, RelOptSchema relOptSchema, + SchemaPlus rootSchema) { + Util.discard(rootSchema); // use our own defaultSchema + typeFactory = (JavaTypeFactory) cluster.getTypeFactory(); + planner = cluster.getPlanner(); + planner.setExecutor(executor); + return null; + } + }, + config); + + state = State.STATE_2_READY; + + // If user specify own traitDef, instead of default default trait, + // first, clear the default trait def registered with planner + // then, register the trait def specified in traitDefs. + if (this.traitDefs != null) { + planner.clearRelTraitDefs(); + for (RelTraitDef def : this.traitDefs) { + planner.addRelTraitDef(def); + } + } + } + + private static SchemaPlus rootSchema(SchemaPlus schema) { + for (; ; ) { + if (schema.getParentSchema() == null) { + return schema; + } + schema = schema.getParentSchema(); + } + } + + private CalciteCatalogReader createCatalogReader() { + SchemaPlus rootSchema = rootSchema(defaultSchema); + return new CalciteCatalogReader( + CalciteSchema.from(rootSchema), + parserConfig.caseSensitive(), + CalciteSchema.from(defaultSchema).path(null), + typeFactory); + } + + private RexBuilder createRexBuilder() { + return new RexBuilder(typeFactory); + } + + private SqlConformance conformance() { + final Context context = config.getContext(); + if (context != null) { + final CalciteConnectionConfig connectionConfig = + context.unwrap(CalciteConnectionConfig.class); + if (connectionConfig != null) { + return connectionConfig.conformance(); + } + } + return SqlConformanceEnum.DEFAULT; + } + + /** + * Implements {@link org.apache.calcite.plan.RelOptTable.ViewExpander} + * interface for {@link org.apache.calcite.tools.Planner}. + */ + public class ViewExpanderImpl implements ViewExpander { + @Override + public RelRoot expandView(RelDataType rowType, String queryString, + List<String> schemaPath, List<String> viewPath) { + SqlParser parser = SqlParser.create(queryString, parserConfig); + SqlNode sqlNode; + try { + sqlNode = parser.parseQuery(); + } catch (SqlParseException e) { + throw new RuntimeException("parse failed", e); + } + + final SqlConformance conformance = conformance(); + final CalciteCatalogReader catalogReader = + createCatalogReader().withSchemaPath(schemaPath); + final SqlValidator validator = + new CalciteSqlValidator(operatorTable, catalogReader, typeFactory, + conformance); + validator.setIdentifierExpansion(true); + final SqlNode validatedSqlNode = validator.validate(sqlNode); + + final RexBuilder rexBuilder = createRexBuilder(); + final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + final SqlToRelConverter.Config config = SqlToRelConverter.configBuilder() + .withTrimUnusedFields(false).withConvertTableAccess(false).build(); + final SqlToRelConverter sqlToRelConverter = + new SqlToRelConverter(new ViewExpanderImpl(), validator, + catalogReader, cluster, convertletTable, config); + + root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false); + root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); + root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)); + + return CalciteTest.this.root; + } + } + + private enum State { + STATE_0_CLOSED { + @Override + void from(CalciteTest planner) { + planner.close(); + } + }, + STATE_1_RESET { + @Override + void from(CalciteTest planner) { + planner.ensure(STATE_0_CLOSED); + planner.reset(); + } + }, + STATE_2_READY { + @Override + void from(CalciteTest planner) { + STATE_1_RESET.from(planner); + planner.ready(); + } + }, + STATE_3_PARSED, + STATE_4_VALIDATED, + STATE_5_CONVERTED; + + /** + * Moves planner's state to this state. This must be a higher state. + */ + void from(CalciteTest planner) { + throw new IllegalArgumentException("cannot move from " + planner.state + + " to " + this); + } + } + + void calTest() throws SqlParseException { + +// String sql = "select t.orders.id from t.orders"; +// +// String sql = "select t.products.id " +// + "from t.orders, t.products " +// + "where t.orders.id = t.products.id and quantity>2 "; + + String sql = "SELECT t.products.id AS product_id, t.products.name " + + "AS product_name, t.orders.id AS order_id " + + "FROM t.products JOIN t.orders ON t.products.id = t.orders.id WHERE quantity > 2"; + + final SqlParser.Config parserConfig = SqlParser.configBuilder().setLex(Lex.MYSQL).build(); + + // Parse the query + SqlParser parser = SqlParser.create(sql, parserConfig); + SqlNode sqlNode = parser.parseStmt(); + + // Validate the query + CalciteCatalogReader catalogReader = createCatalogReader(); + SqlValidator validator = SqlValidatorUtil.newValidator( + SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlConformance.DEFAULT); + SqlNode validatedSqlNode = validator.validate(sqlNode); + + // Convert SqlNode to RelNode + RexBuilder rexBuilder = createRexBuilder(); + RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + SqlToRelConverter sqlToRelConverter = new SqlToRelConverter( + new ViewExpanderImpl(), + validator, + createCatalogReader(), + cluster, + convertletTable); + RelRoot root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true); + System.out.println(RelOptUtil.toString(root.rel)); + + // Optimize the plan + RelOptPlanner planner = new VolcanoPlanner(); + + // Create a set of rules to apply + Program program = Programs.ofRules( +// FilterProjectTransposeRule.INSTANCE, +// ProjectMergeRule.INSTANCE, +// FilterMergeRule.INSTANCE, +// FilterJoinRule.JOIN, + GearFilterRule.INSTANCE, + LoptOptimizeJoinRule.INSTANCE); + + RelTraitSet traitSet = planner.emptyTraitSet().replace(EnumerableConvention.INSTANCE); + + // Execute the program +// RelNode optimized = program.run(planner, root.rel, traitSet, +// ImmutableList.<RelOptMaterialization>of(), ImmutableList.<RelOptLattice>of()); +// logger.info(RelOptUtil.toString(optimized)); + + } + + // new test ------------------------- + private Planner getPlanner(List<RelTraitDef> traitDefs, Program... programs) { + try { + return getPlanner(traitDefs, SqlParser.Config.DEFAULT, programs); + } catch (ClassNotFoundException e) { + LOG.error(e.getMessage()); + } catch (SQLException e) { + LOG.error(e.getMessage()); + } + return null; + } + + private Planner getPlanner(List<RelTraitDef> traitDefs, + SqlParser.Config parserConfig, + Program... programs) throws ClassNotFoundException, SQLException { + + Class.forName("org.apache.calcite.jdbc.Driver"); + java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:"); + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + rootSchema.add("T", new ReflectiveSchema(new SampleTransactions.Transactions())); + + final FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(parserConfig) + .defaultSchema(rootSchema) + .traitDefs(traitDefs) + .programs(programs) + .build(); + return Frameworks.getPlanner(config); + } + + void calTest2() throws SqlParseException, ValidationException, RelConversionException { + + RuleSet ruleSet = RuleSets.ofList( + SortRemoveRule.INSTANCE, + EnumerableRules.ENUMERABLE_PROJECT_RULE, + EnumerableRules.ENUMERABLE_SORT_RULE); + + Planner planner = getPlanner(null, Programs.of(ruleSet)); + + String sql = "SELECT * FROM t.products ORDER BY t.products.id"; + + SqlNode parse = planner.parse(sql); + System.out.println("SQL Parse Tree:- \n" + parse.toString()); + + SqlNode validate = planner.validate(parse); + RelNode convert = planner.rel(validate).project(); + RelTraitSet traitSet = convert.getTraitSet().replace(EnumerableConvention.INSTANCE); + RelNode transform = planner.transform(0, traitSet, convert); + System.out.println("\n\nRelational Expression:- \n" + RelOptUtil.toString(transform)); + + } + + + public static void main(String[] args) throws ClassNotFoundException, + SQLException, SqlParseException { + + // calTest() +// Class.forName("org.apache.calcite.jdbc.Driver"); +// java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:"); +// CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); +// SchemaPlus rootSchema = calciteConnection.getRootSchema(); +// rootSchema.add("t", new ReflectiveSchema(new StreamQueryPlanner.Transactions())); +// +// FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema); +// CalciteTest ct = new CalciteTest(frameworkConfig); +// ct.ready(); +// ct.calTest(); + + // calTest2() + CalciteTest calTest = new CalciteTest(); + try { + calTest.calTest2(); + } catch (ValidationException e) { + LOG.error(e.getMessage()); + } catch (RelConversionException e) { + LOG.error(e.getMessage()); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java ---------------------------------------------------------------------- diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java new file mode 100644 index 0000000..551beff --- /dev/null +++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.sql.planner; + +import com.google.common.io.Resources; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.sql.SQLException; + +public class QueryTest { + + private static final Logger LOG = LoggerFactory.getLogger(QueryTest.class); + + @Test + public void testLogicalPlan() { + + try { + CalciteConnection connection = new Connection(); + String salesSchema = Resources.toString(Query.class.getResource("/model.json"), + Charset.defaultCharset()); + new ModelHandler(connection, "inline:" + salesSchema); + + Query queryPlanner = new Query(connection.getRootSchema().getSubSchema(connection.getSchema())); + RelNode logicalPlan = queryPlanner.getLogicalPlan("SELECT item FROM transactions"); + + System.out.println("Getting Logical Plan...\n" + RelOptUtil.toString(logicalPlan)); + + } catch (IOException e) { + LOG.error(e.getMessage()); + } catch (RelConversionException e) { + LOG.error(e.getMessage()); + } catch (ValidationException e) { + LOG.error(e.getMessage()); + } catch (SQLException e) { + LOG.error(e.getMessage()); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/resources/model.json ---------------------------------------------------------------------- diff --git a/experiments/sql/src/test/resources/model.json b/experiments/sql/src/test/resources/model.json new file mode 100644 index 0000000..bfb62ed --- /dev/null +++ b/experiments/sql/src/test/resources/model.json @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * A JSON model of a simple Calcite schema. + */ + +{ + "version": "1.0", + "defaultSchema": "SALES", + "schemas": [ + { + "name": "SALES", + "tables": [ + { + "name": "Transactions", + "type": "custom", + "factory": "org.apache.gearpump.sql.table.TransactionsTableFactory", + "operand": { + "file": "resources/sales/Transactions.csv", + "flavor": "scannable" + } + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/resources/sales/Transactions.csv ---------------------------------------------------------------------- diff --git a/experiments/sql/src/test/resources/sales/Transactions.csv b/experiments/sql/src/test/resources/sales/Transactions.csv new file mode 100644 index 0000000..f974d97 --- /dev/null +++ b/experiments/sql/src/test/resources/sales/Transactions.csv @@ -0,0 +1,6 @@ +timeStamp:int,id:string,item:string,quantity:int +100,"I001","item1",3 +101,"I002","item2",5 +102,"I003","item3",8 +103,"I004","item4",33 +104,"I005","item5",23 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/project/BuildExperiments.scala ---------------------------------------------------------------------- diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala index 59c95f7..84c80f0 100644 --- a/project/BuildExperiments.scala +++ b/project/BuildExperiments.scala @@ -133,4 +133,16 @@ object BuildExperiments extends sbt.Build { )) .dependsOn(core % "provided", streaming % "test->test; provided") .disablePlugins(sbtassembly.AssemblyPlugin) + + lazy val sql = Project( + id = "gearpump-experiments-sql", + base = file("experiments/sql"), + settings = commonSettings ++ noPublish ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.calcite" % "calcite-core" % calciteVersion + ) + )) + .dependsOn(core % "provided", streaming % "test->test; provided") + .disablePlugins(sbtassembly.AssemblyPlugin) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/project/Dependencies.scala ---------------------------------------------------------------------- diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 90b57d3..b146c08 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -54,6 +54,7 @@ object Dependencies { val chillVersion = "0.6.0" val jedisVersion = "2.9.0" val rabbitmqVersion = "3.5.3" + val calciteVersion = "1.12.0" val annotationDependencies = Seq( // work around for compiler warnings like
