This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-25 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 55e972c4966d4e060e545d41b609217b2acf2d36 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Thu Feb 24 15:52:57 2022 +0100 [WAYANG-25] SQL API seed Signed-off-by: bertty <[email protected]> --- wayang-api/pom.xml | 1 + wayang-api/wayang-api-sql/pom.xml | 43 +++ .../java/org/apache/wayang/api/sql/WayangSQL.java | 22 ++ .../sql/converter/WayangToEnumerableConverter.java | 81 +++++ .../converter/WayangToEnumerableConverterRule.java | 40 +++ .../org/apache/wayang/api/sql/executor/Client.java | 53 +++ .../wayang/api/sql/executor/CustomEnumerator.java | 79 +++++ .../wayang/api/sql/executor/CustomSchema.java | 49 +++ .../api/sql/executor/CustomSchemaFactory.java | 43 +++ .../wayang/api/sql/executor/CustomTable.java | 65 ++++ .../wayang/api/sql/parser/CalciteParser.java | 136 ++++++++ .../org/apache/wayang/api/sql/parser/Main.java | 107 ++++++ .../api/sql/parser/SimpleCalciteConnection.java | 360 +++++++++++++++++++++ .../wayang/api/sql/relation/node/WayangFilter.java | 51 +++ .../api/sql/relation/node/WayangProject.java | 58 ++++ .../wayang/api/sql/relation/node/WayangRel.java | 78 +++++ .../api/sql/relation/node/WayangRelFactories.java | 81 +++++ .../wayang/api/sql/relation/node/WayangValues.java | 43 +++ .../apache/wayang/api/sql/rule/WayangRules.java | 90 ++++++ .../apache/wayang/api/sql/schema/WayangSchema.java | 40 +++ .../wayang/api/sql/schema/WayangSchemaFactory.java | 38 +++ .../apache/wayang/api/sql/table/WayangTable.java | 53 +++ .../wayang/api/sql/table/WayangTableFactory.java | 51 +++ .../src/main/resources/WayangModel.json | 22 ++ .../wayang-api-sql/src/main/resources/data.csv | 3 + .../src/main/resources/log4j.properties | 23 ++ .../wayang-api-sql/src/main/resources/model.json | 11 + 27 files changed, 1721 insertions(+) diff --git a/wayang-api/pom.xml b/wayang-api/pom.xml index e31751b..47b8027 100644 --- a/wayang-api/pom.xml +++ b/wayang-api/pom.xml @@ -38,6 +38,7 @@ <modules> <module>wayang-api-scala-java</module> <module>wayang-api-python</module> + <module>wayang-api-sql</module> </modules> </project> diff --git a/wayang-api/wayang-api-sql/pom.xml b/wayang-api/wayang-api-sql/pom.xml new file mode 100644 index 0000000..7755cb0 --- /dev/null +++ b/wayang-api/wayang-api-sql/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>wayang-api</artifactId> + <groupId>org.apache.wayang</groupId> + <version>0.6.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>wayang-api-sql</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>1.29.0</version> + </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-linq4j</artifactId> + <version>1.29.0</version> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/WayangSQL.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/WayangSQL.java new file mode 100644 index 0000000..f71c600 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/WayangSQL.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql; + +public class WayangSQL { + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/converter/WayangToEnumerableConverter.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/converter/WayangToEnumerableConverter.java new file mode 100644 index 0000000..987e428 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/converter/WayangToEnumerableConverter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.converter; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.util.BuiltInMethod; + +import java.util.List; +import org.apache.wayang.api.sql.relation.node.WayangRel; + +public class WayangToEnumerableConverter extends ConverterImpl + implements EnumerableRel { + /** Creates a PigToEnumerableConverter. */ + protected WayangToEnumerableConverter( + RelOptCluster cluster, + RelTraitSet traits, + RelNode input) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new WayangToEnumerableConverter( + getCluster(), traitSet, sole(inputs)); + } + + /** + * {@inheritDoc} + * + * <p>This implementation does not actually execute the associated Pig Latin + * script and return results. Instead it returns an empty + * {@link org.apache.calcite.adapter.enumerable.EnumerableRel.Result} + * in order to allow for testing and verification of every step of query + * processing up to actual physical execution and result verification. + * + * <p>Next step is to invoke Pig from here, likely in local mode, have it + * store results in a predefined file so they can be read here and returned as + * a {@code Result} object. + */ + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + final BlockBuilder list = new BlockBuilder(); + final PhysType physType = + PhysTypeImpl.of(implementor.getTypeFactory(), rowType, + pref.prefer(JavaRowFormat.ARRAY)); + WayangRel.Implementor impl = new WayangRel.Implementor(); + impl.visitChild(0, getInput()); + Hook.QUERY_PLAN.run(impl.getStatements().count()); // for script validation in tests + list.add( + Expressions.return_(null, + Expressions.call( + BuiltInMethod.EMPTY_ENUMERABLE.method))); + return implementor.result(physType, list.toBlock()); + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/converter/WayangToEnumerableConverterRule.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/converter/WayangToEnumerableConverterRule.java new file mode 100644 index 0000000..153b6f9 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/converter/WayangToEnumerableConverterRule.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.converter; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.wayang.api.sql.relation.node.WayangRel; + +public class WayangToEnumerableConverterRule extends ConverterRule { + public static final ConverterRule INSTANCE = + new WayangToEnumerableConverterRule(); + + private WayangToEnumerableConverterRule() { + super(RelNode.class, WayangRel.CONVENTION, EnumerableConvention.INSTANCE, + "WayangToEnumerableConverterRule"); + } + + @Override public RelNode convert(RelNode rel) { + RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention()); + return new WayangToEnumerableConverter(rel.getCluster(), newTraitSet, rel); + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/Client.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/Client.java new file mode 100644 index 0000000..edf5629 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/Client.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.executor; + +import java.net.URL; +import java.net.URLDecoder; +import java.sql.*; +import java.util.Properties; + +public class Client { + + public static void main(String[] args) { + try { + /** + * Using files + * */ + URL url = Client.class.getResource("/WayangModel.json"); + String str = URLDecoder.decode(url.toString(), "UTF-8"); + System.out.println(str); + Properties info = new Properties(); + info.put("model", str.replace("file:", "")); + Connection connection = DriverManager.getConnection("jdbc:calcite:", info); + + + Statement statement = connection.createStatement(); + //ResultSet resultSet = statement.executeQuery("select count(*) from (values (1), (2))"); + //ResultSet resultSet = statement.executeQuery("select count(*) from (values (1), (2))"); + ResultSet resultSet = statement.executeQuery("select * from (values (1), (2)) where EXPR$0 > 1"); + System.out.println(resultSet.getMetaData().getColumnName(1)); + while (resultSet.next()) { + System.out.println("data => "); + System.out.println(resultSet.getObject(1)); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomEnumerator.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomEnumerator.java new file mode 100644 index 0000000..3245ca6 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomEnumerator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.executor; + +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.Source; + +import java.io.BufferedReader; +import java.io.IOException; + +/** + * Data output + */ +public class CustomEnumerator<E> implements Enumerator<E> { + + private E current; + + private BufferedReader br; + + public CustomEnumerator(Source source) { + try { + this.br = new BufferedReader(source.reader()); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + @Override + public E current() { + return current; + } + + @Override + public boolean moveNext() { + try { + String line = br.readLine(); + if(line == null){ + return false; + } + current = (E)new Object[]{line}; // If there are multiple columns, here are multiple values + } catch (IOException e) { + e.printStackTrace(); + return false; + } + return true; + } + + /** + * Anomalies go here + * */ + @Override + public void reset() { + System.out.println("Reported a wrong brother, does not support this operation"); + } + + /** + * InputStream stream is closed here + * */ + @Override + public void close() { + + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomSchema.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomSchema.java new file mode 100644 index 0000000..6e2761a --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomSchema.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.executor; + +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.schema.*; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.util.Source; +import org.apache.calcite.util.Sources; + +import java.net.URL; +import java.util.Map; + +/** + * Similar to the database, Schema represents the database + * */ + +public class CustomSchema extends AbstractSchema { + private Map<String, Table> tableMap; + + @Override + protected Map<String, Table> getTableMap() { + URL url = CustomSchema.class.getResource("/data.csv"); + Source source = Sources.of(url); + if (tableMap == null) { + final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); + // A database has multiple table names, initialized here, the case should be noted, TEST01 is the table name. + builder.put("TEST01", new CustomTable(source)); + tableMap = builder.build(); + } + return tableMap; + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomSchemaFactory.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomSchemaFactory.java new file mode 100644 index 0000000..20e3b66 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomSchemaFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.executor; + +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; + +import java.util.Map; + +/** + * Custom schemaFacoty entry + * Configure factory class from configuration file + * ModelHandler will tune this factory class + * */ +public class CustomSchemaFactory implements SchemaFactory { + + /** + * parentSchema his parent node, usually root + * name the name of the database, which is defined in the model + * operand is also defined in mode, is a Map type, used to pass in custom parameters. + * */ + @Override + public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { + return new CustomSchema(); + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomTable.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomTable.java new file mode 100644 index 0000000..8836849 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/executor/CustomTable.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.executor; + +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Source; + +import java.util.ArrayList; +import java.util.List; + +public class CustomTable extends AbstractTable implements ScannableTable { + private Source source; + + public CustomTable(Source source) { + this.source = source; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { + JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory; + + List<String> names = new ArrayList<>(); + names.add("lala"); + List<RelDataType> types = new ArrayList<>(); + types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR)); + + return typeFactory.createStructType(Pair.zip(names,types)); + } + + @Override + public Enumerable<Object[]> scan(DataContext dataContext) { + return new AbstractEnumerable<Object[]>() { + @Override + public Enumerator<Object[]> enumerator() { + return new CustomEnumerator<>(source); + } + }; + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/CalciteParser.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/CalciteParser.java new file mode 100644 index 0000000..8d1fba8 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/CalciteParser.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.parser; + +import com.google.common.io.Resources; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.*; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.apache.wayang.api.sql.relation.node.WayangRelFactories; +import org.apache.wayang.api.sql.rule.WayangRules; + + +public class CalciteParser { + public Planner planner; + + public CalciteParser(SchemaPlus schema) { + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + + FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder() + // Lexical configuration defines how identifiers are quoted, whether they are converted to upper or lower + // case when they are read, and whether identifiers are matched case-sensitively. + .setLex(Lex.MYSQL) + .build()) + // Sets the schema to use by the planner + .defaultSchema(schema) + .traitDefs(traitDefs) + // Context provides a way to store data within the planner session that can be accessed in planner rules. + .context(WayangRelFactories.ALL_RHEEM_REL_FACTORIES) + // Rule sets to use in transformation phases. Each transformation phase can use a different set of rules. + //.ruleSets(RuleSets.ofList()) + // Custom cost factory to use during optimization + .costFactory(null) + .typeSystem(RelDataTypeSystem.DEFAULT) + .build(); + + this.planner = Frameworks.getPlanner(calciteFrameworkConfig); + + } + + public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException { + SqlNode sqlNode; + + try { + sqlNode = planner.parse(query); + } catch (SqlParseException e) { + throw new RuntimeException("Query parsing error.", e); + } + + + System.out.println(planner.validateAndGetType(sqlNode)); + + return planner.rel(sqlNode).project(); + } + + public RelNode toPhisicalPlan(RelNode logicalPlan) throws RelConversionException { + final RelBuilderFactory builderFactory = + RelBuilder.proto(WayangRelFactories.ALL_RHEEM_REL_FACTORIES); + final RelOptPlanner planner = logicalPlan.getCluster().getPlanner(); // VolcanoPlanner + List<RelOptRule> rm = planner.getRules(); + for (RelOptRule r : rm){ + planner.removeRule(r); + } + + for (RelOptRule r : WayangRules.ALL_WAYANG_OPT_RULES) { + planner.addRule(r); + } + planner.setRoot(logicalPlan); + System.out.println(planner.getContext().toString()); + System.out.println(planner.getRules()); + //return planner.findBestExp(); + return planner.getRoot(); + } + + public static void main(String[] args) throws IOException, SQLException, ValidationException, RelConversionException, SqlParseException { + // Simple connection implementation for loading schema from sales.json + CalciteConnection connection = new SimpleCalciteConnection(); + String salesSchema = Resources.toString(CalciteParser.class.getResource("/WayangModel.json"), Charset.defaultCharset()); + // ModelHandler reads the sales schema and load the schema to connection's root schema and sets the default schema + new ModelHandler(connection, "inline:" + salesSchema); + + // Create the query planner with sales schema. conneciton.getSchema returns default schema name specified in sales.json + CalciteParser queryPlanner = new CalciteParser(connection.getRootSchema().getSubSchema(connection.getSchema())); + RelNode logicalPlan = queryPlanner + .getLogicalPlan( + "select EXPR$0 from (values (1), (2)) where EXPR$0 > 1" + //"select t.tc0 from t where t.tc0 = 'hello'" + ); + + + + System.out.println(RelOptUtil.toString(logicalPlan)); + + RelNode tmp = queryPlanner.toPhisicalPlan(logicalPlan); + + System.out.println(RelOptUtil.toString(tmp)); + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/Main.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/Main.java new file mode 100644 index 0000000..c195982 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/Main.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.parser; + +import com.google.common.io.Resources; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.apache.wayang.api.sql.relation.node.WayangRel; +import org.apache.wayang.api.sql.relation.node.WayangRelFactories; + +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.EQUALS; + +public class Main { + + public static void main(String... args) throws IOException, SQLException { + CalciteConnection connection = new SimpleCalciteConnection(); + String salesSchema = Resources.toString(Main.class.getResource("/WayangModel.json"), Charset.defaultCharset()); + // ModelHandler reads the sales schema and load the schema to connection's root schema and sets the default schema + new ModelHandler(connection, "inline:" + salesSchema); + + // Create the query planner with sales schema. conneciton.getSchema returns default schema name specified in sales.json + SchemaPlus schema = connection.getRootSchema().getSubSchema(connection.getSchema()); + + + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + + + FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder() + // Lexical configuration defines how identifiers are quoted, whether they are converted to upper or lower + // case when they are read, and whether identifiers are matched case-sensitively. + .setLex(Lex.MYSQL) + .build()) + // Sets the schema to use by the planner + .defaultSchema(schema) + .traitDefs(traitDefs) + // Context provides a way to store data within the planner session that can be accessed in planner rules. + .context(WayangRelFactories.ALL_RHEEM_REL_FACTORIES) + // Rule sets to use in transformation phases. Each transformation phase can use a different set of rules. + //.ruleSets(RuleSets.ofList()) + // Custom cost factory to use during optimization + .costFactory(null) + .typeSystem(RelDataTypeSystem.DEFAULT) + .build(); + + RelBuilder builder = RelBuilder.create(calciteFrameworkConfig); + RelNode root = builder.values(new String[]{"tc0"}, 1, 2, 3) + .filter( + builder.call(EQUALS, builder.field("tc0"), builder.literal(2)) + ).build(); + + System.out.println(RelOptUtil.toString(root)); + + final RelBuilderFactory builderFactory = + RelBuilder.proto(WayangRelFactories.ALL_RHEEM_REL_FACTORIES); +// final RelOptPlanner rel_planner = root.getCluster().getPlanner(); // VolcanoPlanner +// for (RelOptRule r : WayangRules.ALL_RHEEM_OPT_RULES) { +// rel_planner.addRule(r); +// } +// rel_planner.setRoot(root); +// +// RelNode tmp = rel_planner.findBestExp(); +// + WayangRel.Implementor implementor = new WayangRel.Implementor(); + + implementor.visitChild(0, root); + implementor.getStatements() + .forEach(System.out::println); + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/SimpleCalciteConnection.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/SimpleCalciteConnection.java new file mode 100644 index 0000000..3b23b4f --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/parser/SimpleCalciteConnection.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.parser; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.Frameworks; + +import java.lang.reflect.Type; +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +public class SimpleCalciteConnection implements CalciteConnection { + private final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + private String schema; + + @Override + public SchemaPlus getRootSchema() { + return rootSchema; + } + + @Override + public JavaTypeFactory getTypeFactory() { + return null; + } + + @Override + public Properties getProperties() { + return null; + } + + @Override + public Statement createStatement() throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return null; + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + + } + + @Override + public boolean getAutoCommit() throws SQLException { + return false; + } + + @Override + public void commit() throws SQLException { + + } + + @Override + public void rollback() throws SQLException { + + } + + @Override + public void close() throws SQLException { + + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + + } + + @Override + public boolean isReadOnly() throws SQLException { + return false; + } + + @Override + public void setCatalog(String catalog) throws SQLException { + + } + + @Override + public String getCatalog() throws SQLException { + return null; + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + + } + + @Override + public int getTransactionIsolation() throws SQLException { + return 0; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return null; + } + + @Override + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + + } + + @Override + public void setHoldability(int holdability) throws SQLException { + + } + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return null; + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return null; + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return null; + } + + @Override + public Clob createClob() throws SQLException { + return null; + } + + @Override + public Blob createBlob() throws SQLException { + return null; + } + + @Override + public NClob createNClob() throws SQLException { + return null; + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return null; + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return false; + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + + } + + @Override + public String getClientInfo(String name) throws SQLException { + return null; + } + + @Override + public Properties getClientInfo() throws SQLException { + return null; + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return null; + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return null; + } + + @Override + public void setSchema(String schema) throws SQLException { + this.schema = schema; + } + + @Override + public String getSchema() throws SQLException { + return schema; + } + + @Override + public void abort(Executor executor) throws SQLException { + + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + + } + + @Override + public int getNetworkTimeout() throws SQLException { + return 0; + } + + @Override + public CalciteConnectionConfig config() { + return null; + } + + @Override + public CalcitePrepare.Context createPrepareContext() { + return null; + } + + @Override + public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) { + return null; + } + + @Override + public <T> Queryable<T> createQuery(Expression expression, Type rowType) { + return null; + } + + @Override + public <T> T execute(Expression expression, Class<T> type) { + return null; + } + + @Override + public <T> T execute(Expression expression, Type type) { + return null; + } + + @Override + public <T> Enumerator<T> executeQuery(Queryable<T> queryable) { + return null; + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangFilter.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangFilter.java new file mode 100644 index 0000000..02e3923 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangFilter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.relation.node; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; + +import java.util.stream.Stream; + +public class WayangFilter extends Filter implements WayangRel { + + public WayangFilter(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override + public void implement(Implementor implementor) { + implementor.visitChild(0, getInput()); + implementor.addStatement(getWayangFilterStatement(implementor)); + } + + private Stream<String> getWayangFilterStatement(Implementor implementor) { + System.out.println(condition); + //TODO HERE the RHEEM CODE + return implementor.getStatements().filter(ele -> Integer.parseInt(ele) == 2); + } + + @Override + public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return null; + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangProject.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangProject.java new file mode 100644 index 0000000..f25868f --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangProject.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.relation.node; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptCostImpl; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +import java.util.Collections; +import java.util.List; + +public class WayangProject extends Project implements WayangRel { + public WayangProject(RelOptCluster cluster, RelTraitSet traits, List<RelHint> hints, RelNode input, List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, hints, input, projects, rowType); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + RelOptCost tmp = planner.getCostFactory().makeCost(10, 10, 10); + System.out.println("heree "+ tmp.isInfinite() + " "+ tmp.toString() ); + return tmp; + } + + @Override + public void implement(Implementor implementor) { + System.out.println("project :D"); + } + + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) { + return new WayangProject(input.getCluster(), traitSet, Collections.emptyList(), input, projects, rowType); + } + + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangRel.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangRel.java new file mode 100644 index 0000000..ed1c0cb --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangRel.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.relation.node; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; + +import java.util.List; +import java.util.stream.Stream; + +/** + * Relational expression that uses the Wayang calling convention. + */ +public interface WayangRel extends RelNode { + /** + * Converts this node to a Pig Latin statement. + */ + void implement(Implementor implementor); + + /** Calling convention for relational operations that occur in Wayang. */ + Convention CONVENTION = new Convention.Impl("Wayang", WayangRel.class); + +// @Override +// default RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { +// return planner.getCostFactory().makeZeroCost(); +// } + + /** + * Callback for the implementation process that converts a tree of + * {@link WayangRel} nodes into complete Wayang Plan. + */ + class Implementor { + + private Stream<String> statements; + + public String getTableName(RelNode input) { + final List<String> qualifiedName = input.getTable().getQualifiedName(); + return qualifiedName.get(qualifiedName.size() - 1); + } + + public String getWayangRelationAlias(RelNode input) { + return getTableName(input); + } + + public String getFieldName(RelNode input, int index) { + return input.getRowType().getFieldList().get(index).getName(); + } + + public void addStatement(Stream<String> statement) { + statements = statement; + } + + public void visitChild(int ordinal, RelNode input) { + assert ordinal == 0; + ((WayangRel) input).implement(this); + } + + public Stream<String> getStatements() { + return statements; + } + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangRelFactories.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangRelFactories.java new file mode 100644 index 0000000..3349c7a --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangRelFactories.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.relation.node; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; + +import java.util.List; +import java.util.Set; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class WayangRelFactories { + + public static final Context ALL_RHEEM_REL_FACTORIES = + Contexts.of( + WayangFilterFactory.INSTANCE, + WayangValuesFactory.INSTANCE, + WayangProjectFactory.INSTANCE + ); + + private WayangRelFactories() { + } + + + public static class WayangFilterFactory implements RelFactories.FilterFactory { + + public static final WayangFilterFactory INSTANCE = new WayangFilterFactory(); + + @Override + public RelNode createFilter(RelNode input, RexNode condition, Set<CorrelationId> variablesSet) { + return new WayangFilter(input.getCluster(), input.getTraitSet().replace(WayangRel.CONVENTION), input, condition); + } + } + + public static class WayangValuesFactory implements RelFactories.ValuesFactory { + + public static final WayangValuesFactory INSTANCE = new WayangValuesFactory(); + + @Override + public RelNode createValues(RelOptCluster cluster, RelDataType rowType, List<ImmutableList<RexLiteral>> tuples) { + return new WayangValues(cluster, rowType, (ImmutableList<ImmutableList<RexLiteral>>) tuples, cluster.traitSet()); + } + } + + public static class WayangProjectFactory implements RelFactories.ProjectFactory { + public static final WayangProjectFactory INSTANCE = new WayangProjectFactory(); + + @Override + public RelNode createProject( + RelNode input, + List<RelHint> hints, + List<? extends RexNode> childExprs, + @Nullable List<? extends String> fieldNames) { + return new WayangProject(input.getCluster(), input.getTraitSet(), hints, input, childExprs, input.getRowType()); + } + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangValues.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangValues.java new file mode 100644 index 0000000..4eb8973 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/relation/node/WayangValues.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.relation.node; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; + +public class WayangValues extends Values implements WayangRel { + + public WayangValues(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + } + + @Override + public void implement(Implementor implementor) { + //TODO HERE the RHEEM CODE + implementor.addStatement( + tuples.parallelStream() + .flatMap(list -> list.stream()) + .map(ele -> ele.toString()) + ); + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/rule/WayangRules.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/rule/WayangRules.java new file mode 100644 index 0000000..8ae2d30 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/rule/WayangRules.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.rule; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalValues; + +import java.util.Collections; +import java.util.List; +import org.apache.wayang.api.sql.relation.node.WayangFilter; +import org.apache.wayang.api.sql.relation.node.WayangProject; +import org.apache.wayang.api.sql.relation.node.WayangRel; +import org.apache.wayang.api.sql.relation.node.WayangValues; + +public class WayangRules { + public static final List<ConverterRule> ALL_WAYANG_OPT_RULES = + ImmutableList.of( + WayangFilterRule.INSTANCE, + WayangProjectRule.INSTANCE, + WayangValuesRule.INSTANCE + ) + ; + + private static class WayangFilterRule extends ConverterRule { + private static final WayangFilterRule INSTANCE = new WayangFilterRule(); + + private WayangFilterRule() { + super(LogicalFilter.class, Convention.NONE, WayangRel.CONVENTION, "WayangFilterRule"); + } + + public RelNode convert(RelNode rel) { + final LogicalFilter filter = (LogicalFilter) rel; + final RelTraitSet traitSet = filter.getTraitSet().replace(0, WayangRel.CONVENTION); + return new WayangFilter(rel.getCluster(), traitSet, + convert(filter.getInput(), WayangRel.CONVENTION), filter.getCondition()); + } + } + + private static class WayangProjectRule extends ConverterRule { + private static final WayangProjectRule INSTANCE = new WayangProjectRule(); + + private WayangProjectRule() { + super(LogicalProject.class, Convention.NONE, WayangRel.CONVENTION, "WayangProjectRule"); + } + + public RelNode convert(RelNode rel) { + final LogicalProject project = (LogicalProject) rel; + final RelTraitSet traitSet = project.getTraitSet().replace(0, WayangRel.CONVENTION); + + + return new WayangProject(project.getCluster(), traitSet, Collections.emptyList(), convert(project, WayangRel.CONVENTION), + project.getProjects(), project.getRowType()); + } + } + + private static class WayangValuesRule extends ConverterRule { + private static final WayangValuesRule INSTANCE = new WayangValuesRule(); + private WayangValuesRule() { + super(LogicalValues.class, Convention.NONE, WayangRel.CONVENTION, "WayangValueRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final LogicalValues values = (LogicalValues) rel; + final RelTraitSet traitSet = values.getTraitSet().replace(0, WayangRel.CONVENTION); + return new WayangValues(values.getCluster(), values.getRowType(), values.getTuples(), traitSet); + } + } +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/schema/WayangSchema.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/schema/WayangSchema.java new file mode 100644 index 0000000..4e3b8d8 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/schema/WayangSchema.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.schema; + + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; + +import java.util.HashMap; +import java.util.Map; +import org.apache.wayang.api.sql.table.WayangTable; + +public class WayangSchema extends AbstractSchema { + + protected final Map<String, Table> tableMap = new HashMap<>(); + + @Override protected Map<String, Table> getTableMap() { + return tableMap; + } + + public void registerTable(String name, WayangTable table) { + tableMap.put(name, table); + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/schema/WayangSchemaFactory.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/schema/WayangSchemaFactory.java new file mode 100644 index 0000000..d5358e4 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/schema/WayangSchemaFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.schema; + +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; + +import java.util.Map; + +public class WayangSchemaFactory implements SchemaFactory { + + /** Public singleton, per factory contract. */ + public static final WayangSchemaFactory INSTANCE = new WayangSchemaFactory(); + + private WayangSchemaFactory() { + } + + public Schema create(SchemaPlus parentSchema, String name, + Map<String, Object> operand) { + return new WayangSchema(); + } +} \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/table/WayangTable.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/table/WayangTable.java new file mode 100644 index 0000000..b88ed5a --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/table/WayangTable.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.table; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; + +public class WayangTable extends AbstractTable { + + private final String filePath; + private final String[] fieldNames; + + /** Creates a PigTable. */ + public WayangTable(String filePath, String[] fieldNames) { + this.filePath = filePath; + this.fieldNames = fieldNames; + } + + @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { + final RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (String fieldName : fieldNames) { + // only supports CHARARRAY types for now + final RelDataType relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR); + + final RelDataType nullableRelDataType = typeFactory + .createTypeWithNullability(relDataType, true); + builder.add(fieldName, nullableRelDataType); + } + return builder.build(); + } + + public String getFilePath() { + return filePath; + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/table/WayangTableFactory.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/table/WayangTableFactory.java new file mode 100644 index 0000000..e2e091c --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/table/WayangTableFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.sql.table; + +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.TableFactory; + +import java.io.File; +import java.util.List; +import java.util.Map; +import org.apache.wayang.api.sql.schema.WayangSchema; + +public class WayangTableFactory implements TableFactory<WayangTable> { + // public constructor, per factory contract + public WayangTableFactory() { + } + + @SuppressWarnings("unchecked") + public WayangTable create(SchemaPlus schema, String name, + Map<String, Object> operand, RelDataType rowType) { + String fileName = (String) operand.get("file"); + File file = new File(fileName); + final File base = + (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName); + if (base != null && !file.isAbsolute()) { + file = new File(base, fileName); + } + final List<String> fieldNames = (List<String>) operand.get("columns"); + final WayangTable result = new WayangTable(file.getAbsolutePath(), fieldNames.toArray(new String[0])); + schema.unwrap(WayangSchema.class).registerTable(name, result); + return result; + } + +} diff --git a/wayang-api/wayang-api-sql/src/main/resources/WayangModel.json b/wayang-api/wayang-api-sql/src/main/resources/WayangModel.json new file mode 100644 index 0000000..5f822e4 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/resources/WayangModel.json @@ -0,0 +1,22 @@ +{ + "version": "1.0", + "defaultSchema": "TEST", + "schemas": [ + { + "name": "TEST", + "type": "custom", + "factory": "org.apache.wayang.api.sql.schema.WayangSchemaFactory", + "tables": [ + { + "name": "t", + "type": "custom", + "factory": "org.apache.wayang.api.sql.table.WayangTableFactory", + "operand": { + "file": "/data.csv", + "columns": ["tc0", "tc1"] + } + } + ] + } + ] +} \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/resources/data.csv b/wayang-api/wayang-api-sql/src/main/resources/data.csv new file mode 100644 index 0000000..f595ec7 --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/resources/data.csv @@ -0,0 +1,3 @@ +hello,world +hola,como +wordsssss,sooooooooooollloooooonnnggggg \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/resources/log4j.properties b/wayang-api/wayang-api-sql/src/main/resources/log4j.properties new file mode 100644 index 0000000..ee7519f --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Change rootLogger level to WARN +log4j.rootLogger=WARN, A1 +# Increase level to DEBUG for RelOptPlanner +log4j.logger.org.apache.calcite.plan.RelOptPlanner=DEBUG +# Increase level to TRACE for HepPlanner +log4j.logger.org.apache.calcite.plan.hep.HepPlanner=TRACE \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/resources/model.json b/wayang-api/wayang-api-sql/src/main/resources/model.json new file mode 100644 index 0000000..de66ebf --- /dev/null +++ b/wayang-api/wayang-api-sql/src/main/resources/model.json @@ -0,0 +1,11 @@ +{ + "version": "1.0", + "defaultSchema": "TEST", + "schemas": [{ + "name": "TEST", + "type": "custom", + "factory": "org.apache.wayang.api.sql.executor.CustomSchemaFactory", + "operand": {} + } + ] +} \ No newline at end of file
