This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch calcite-framework in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit f5d31125302b340efb1aec330d5bd7bad77597f0 Author: Bertil Chapuis <[email protected]> AuthorDate: Fri Feb 7 09:56:27 2025 +0100 Add the server ddl class and implement materialized views --- .../apache/baremaps/calcite/DataTableFactory.java | 21 +- .../calcite/baremaps/BaremapsMaterializedView.java | 58 ++ .../calcite/baremaps/BaremapsMutableTable.java | 282 ++++++++ .../calcite/baremaps/ServerDdlExecutor.java | 770 +++++++++++++++++++++ .../apache/baremaps/calcite/BaremapsContext.java | 73 ++ .../apache/baremaps/calcite/CalciteDdlTest.java | 206 ++++++ .../org/apache/baremaps/calcite/CalciteTest.java | 287 +++----- .../data/memory/MemoryMappedDirectory.java | 8 + 8 files changed, 1489 insertions(+), 216 deletions(-) diff --git a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/DataTableFactory.java b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/DataTableFactory.java index 569984662..ce0ea913a 100644 --- a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/DataTableFactory.java +++ b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/DataTableFactory.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.file.Paths; import java.util.Map; -import org.apache.baremaps.calcite.baremaps.BaremapsDataTable; import org.apache.baremaps.calcite.csv.CsvDataTable; import org.apache.baremaps.data.collection.AppendOnlyLog; import org.apache.baremaps.data.collection.DataCollection; @@ -35,16 +34,17 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeImpl; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; import org.apache.calcite.schema.TableFactory; -public class DataTableFactory implements TableFactory<DataTableAdapter> { +public class DataTableFactory implements TableFactory<Table> { public DataTableFactory() { } @Override - public DataTableAdapter create( + public Table create( SchemaPlus schema, String name, Map<String, Object> operand, @@ -52,16 +52,15 @@ public class DataTableFactory implements TableFactory<DataTableAdapter> { final RelProtoDataType protoRowType = rowType != null ? RelDataTypeImpl.proto(rowType) : null; String format = (String) operand.get("format"); - DataTable dataTable = switch (format) { - case "baremaps" -> createMMapTable(schema, name, operand, rowType); + return switch (format) { + case "baremaps" -> createDataTable(schema, name, operand, rowType); case "csv" -> createCsvTable(schema, name, operand, rowType); default -> throw new RuntimeException("Unsupported format"); }; - return new DataTableAdapter(dataTable, protoRowType); } - private DataTable createMMapTable(SchemaPlus schema, String name, Map<String, Object> operand, - RelDataType rowType) { + private Table createDataTable(SchemaPlus schema, String name, Map<String, Object> operand, + RelDataType rowType) { String directory = (String) operand.get("directory"); if (directory == null) { throw new RuntimeException("A directory should be specified"); @@ -76,20 +75,20 @@ public class DataTableFactory implements TableFactory<DataTableAdapter> { DataSchema dataSchema = DataSchema.read(new ByteArrayInputStream(bytes)); DataType<DataRow> dataType = new DataRowType(dataSchema); DataCollection<DataRow> dataCollection = new AppendOnlyLog<>(dataType, memory); - return new BaremapsDataTable(dataSchema, dataCollection); + return null; //new BaremapsTable(dataSchema, dataCollection); } catch (IOException e) { throw new RuntimeException(e); } } - private DataTable createCsvTable(SchemaPlus schema, String name, Map<String, Object> operand, + private Table createCsvTable(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { String file = (String) operand.get("file"); if (file == null) { throw new RuntimeException("A file should be specified"); } try { - return new CsvDataTable(name, new File(file), ',', true); + return new DataTableAdapter(new CsvDataTable(name, new File(file), ',', true)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/BaremapsMaterializedView.java b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/BaremapsMaterializedView.java new file mode 100644 index 000000000..2e24553be --- /dev/null +++ b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/BaremapsMaterializedView.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.calcite.baremaps; + + +import org.apache.calcite.materialize.MaterializationKey; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.sql2rel.InitializerExpressionFactory; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A table that implements a materialized view. */ +class BaremapsMaterializedView extends BaremapsMutableTable { + /** + * The key with which this was stored in the materialization service, or null if not (yet) + * materialized. + */ + @Nullable + MaterializationKey key; + + BaremapsMaterializedView( + String name, + RelProtoDataType protoRowType, + InitializerExpressionFactory initializerExpressionFactory, + RelDataTypeFactory typeFactory) { + super(name, protoRowType, initializerExpressionFactory, typeFactory); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.MATERIALIZED_VIEW; + } + + @Override + public <C extends Object> @Nullable C unwrap(Class<C> aClass) { + if (MaterializationKey.class.isAssignableFrom(aClass) + && aClass.isInstance(key)) { + return aClass.cast(key); + } + return super.unwrap(aClass); + } +} diff --git a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/BaremapsMutableTable.java b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/BaremapsMutableTable.java new file mode 100644 index 000000000..d54a84bf3 --- /dev/null +++ b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/BaremapsMutableTable.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.calcite.baremaps; + +import org.apache.baremaps.calcite.*; +import org.apache.baremaps.calcite.DataColumn.Cardinality; +import org.apache.baremaps.data.collection.AppendOnlyLog; +import org.apache.baremaps.data.collection.DataCollection; +import org.apache.baremaps.data.memory.Memory; +import org.apache.baremaps.data.memory.MemoryMappedDirectory; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ModifiableTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Wrapper; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.schema.impl.AbstractTableQueryable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql2rel.InitializerExpressionFactory; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.lang.reflect.Type; +import java.nio.MappedByteBuffer; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static java.util.Objects.requireNonNull; + + +class BaremapsMutableTable extends AbstractTable implements ModifiableTable, Wrapper { + + private final String name; + + private final InitializerExpressionFactory initializerExpressionFactory; + + private final RelProtoDataType protoRowType; + + private final RelDataType rowType; + + private final DataSchema schema; + + final DataCollection<DataRow> rows; + + BaremapsMutableTable(String name, + RelProtoDataType protoRowType, + InitializerExpressionFactory initializerExpressionFactory, + RelDataTypeFactory typeFactory) { + super(); + this.name = requireNonNull(name, "name"); + this.initializerExpressionFactory = + requireNonNull(initializerExpressionFactory, "initializerExpressionFactory"); + this.protoRowType = requireNonNull(protoRowType, "protoRowType"); + this.rowType = this.protoRowType.apply(typeFactory); + + // Create the schema + List<DataColumn> columns = rowType.getFieldList().stream().map(field -> { + String columnName = field.getName(); + RelDataType relDataType = field.getType(); + DataColumn.Cardinality columnCardinality = cardinalityFromRelDataType(relDataType); + DataColumn.Type columnType = typeFromRelDataType(relDataType); + return (DataColumn) new DataColumnFixed(columnName, columnCardinality, columnType); + }).toList(); + this.schema = new DataSchema(name, columns); + + // Create the collection + DataRowType dataRowType = new DataRowType(schema); + Memory<MappedByteBuffer> memory = new MemoryMappedDirectory(Paths.get(this.name)); + this.rows = new AppendOnlyLog<>(dataRowType, memory); + } + + @Override + public TableModify toModificationRel( + RelOptCluster cluster, + RelOptTable table, + Prepare.CatalogReader catalogReader, + RelNode child, + TableModify.Operation operation, + @Nullable List<String> updateColumnList, + @Nullable List<RexNode> sourceExpressionList, + boolean flattened) { + return LogicalTableModify.create(table, catalogReader, child, operation, + updateColumnList, sourceExpressionList, flattened); + } + + private DataColumn.Cardinality cardinalityFromRelDataType(RelDataType columnType) { + if (columnType.getSqlTypeName() == SqlTypeName.ARRAY) { + return DataColumn.Cardinality.REPEATED; + } else if (columnType.isNullable()) { + return Cardinality.OPTIONAL; + } else { + return Cardinality.REQUIRED; + } + } + + public static DataColumn.Type typeFromRelDataType(RelDataType relDataType) { + SqlTypeName sqlTypeName = relDataType.getSqlTypeName(); + switch (sqlTypeName) { + case BOOLEAN: + return DataColumn.Type.BOOLEAN; + case TINYINT: + return DataColumn.Type.BYTE; + case SMALLINT: + return DataColumn.Type.SHORT; + case INTEGER: + return DataColumn.Type.INTEGER; + case BIGINT: + return DataColumn.Type.LONG; + case FLOAT: + case REAL: + return DataColumn.Type.FLOAT; + case DOUBLE: + case DECIMAL: + return DataColumn.Type.DOUBLE; + case CHAR: + case VARCHAR: + return DataColumn.Type.STRING; + case BINARY: + case VARBINARY: + return DataColumn.Type.BINARY; + case DATE: + return DataColumn.Type.LOCAL_DATE; + case TIME: + return DataColumn.Type.LOCAL_TIME; + case TIMESTAMP: + return DataColumn.Type.LOCAL_DATE_TIME; + case MAP: + return DataColumn.Type.NESTED; + case GEOMETRY: + return DataColumn.Type.GEOMETRY; + case ARRAY: + RelDataType componentType = requireNonNull(relDataType.getComponentType()); + return typeFromRelDataType(componentType); + default: + throw new IllegalArgumentException("Unsupported Calcite type: " + sqlTypeName); + } + } + + @Override + public Collection getModifiableCollection() { + return new CollectionAdapter(); + } + + @Override + public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, + String tableName) { + return new AbstractTableQueryable<T>(queryProvider, schema, this, + tableName) { + @Override + public Enumerator<T> enumerator() { + return (Enumerator<T>) Linq4j.enumerator(new CollectionAdapter()); + } + }; + } + + @Override + public Type getElementType() { + return Object[].class; + } + + @Override + public Expression getExpression(SchemaPlus schema, String tableName, Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + return rowType; + } + + @Override + public <C extends Object> @Nullable C unwrap(Class<C> aClass) { + if (aClass.isInstance(initializerExpressionFactory)) { + return aClass.cast(initializerExpressionFactory); + } + return super.unwrap(aClass); + } + + private class CollectionAdapter implements Collection<Object[]> { + + private final int size; + + public CollectionAdapter() { + this.size = (int) Math.min(rows.size(), Integer.MAX_VALUE); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return rows.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return rows.contains(o); + } + + @Override + public Iterator<Object[]> iterator() { + return rows.stream().map(row -> row.values().toArray()).iterator(); + } + + @Override + public Object[] toArray() { + return rows.stream().map(row -> row.values().toArray()).toArray(); + } + + @Override + public <T> T[] toArray(T[] a) { + return (T[]) rows.stream().map(row -> row.values().toArray()).toArray(); + } + + @Override + public boolean add(Object[] objects) { + return rows.add(new DataRow(schema, List.of(objects))); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection<?> c) { + return rows.containsAll(c); + } + + @Override + public boolean addAll(Collection<? extends Object[]> c) { + return rows.addAll(c.stream().map(objects -> new DataRow(schema, List.of(objects))).toList()); + } + + @Override + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + rows.clear(); + } + } +} diff --git a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/ServerDdlExecutor.java b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/ServerDdlExecutor.java new file mode 100644 index 000000000..0c2650f63 --- /dev/null +++ b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/baremaps/ServerDdlExecutor.java @@ -0,0 +1,770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.calcite.baremaps; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; +import static org.apache.calcite.util.Static.RESOURCE; + +import com.google.common.collect.ImmutableList; +import java.io.Reader; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.adapter.jdbc.JdbcSchema; +import org.apache.calcite.avatica.AvaticaUtils; +import org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.ContextSqlValidator; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.materialize.MaterializationKey; +import org.apache.calcite.materialize.MaterializationService; +import org.apache.calcite.model.JsonSchema; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ColumnStrategy; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.Wrapper; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.ViewTable; +import org.apache.calcite.schema.impl.ViewTableMacro; +import org.apache.calcite.server.DdlExecutor; +import org.apache.calcite.server.DdlExecutorImpl; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.SqlWriterConfig; +import org.apache.calcite.sql.ddl.SqlAttributeDefinition; +import org.apache.calcite.sql.ddl.SqlColumnDeclaration; +import org.apache.calcite.sql.ddl.SqlCreateForeignSchema; +import org.apache.calcite.sql.ddl.SqlCreateFunction; +import org.apache.calcite.sql.ddl.SqlCreateMaterializedView; +import org.apache.calcite.sql.ddl.SqlCreateSchema; +import org.apache.calcite.sql.ddl.SqlCreateTable; +import org.apache.calcite.sql.ddl.SqlCreateTableLike; +import org.apache.calcite.sql.ddl.SqlCreateType; +import org.apache.calcite.sql.ddl.SqlCreateView; +import org.apache.calcite.sql.ddl.SqlDropObject; +import org.apache.calcite.sql.ddl.SqlDropSchema; +import org.apache.calcite.sql.ddl.SqlTruncateTable; +import org.apache.calcite.sql.dialect.CalciteSqlDialect; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlAbstractParserImpl; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParserImplFactory; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl; +import org.apache.calcite.sql.pretty.SqlPrettyWriter; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.InitializerContext; +import org.apache.calcite.sql2rel.InitializerExpressionFactory; +import org.apache.calcite.sql2rel.NullInitializerExpressionFactory; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.NlsString; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Executes DDL commands. + * + * <p> + * Given a DDL command that is a sub-class of {@link SqlNode}, dispatches the command to an + * appropriate {@code execute} method. For example, "CREATE TABLE" ({@link SqlCreateTable}) is + * dispatched to {@link #execute(SqlCreateTable, CalcitePrepare.Context)}. + */ +public class ServerDdlExecutor extends DdlExecutorImpl { + /** Singleton instance. */ + public static final ServerDdlExecutor INSTANCE = new ServerDdlExecutor(); + + /** Parser factory. */ + @SuppressWarnings("unused") // used via reflection + public static final SqlParserImplFactory PARSER_FACTORY = + new SqlParserImplFactory() { + @Override + public SqlAbstractParserImpl getParser(Reader stream) { + return SqlDdlParserImpl.FACTORY.getParser(stream); + } + + @Override + public DdlExecutor getDdlExecutor() { + return ServerDdlExecutor.INSTANCE; + } + }; + + /** + * Creates a ServerDdlExecutor. Protected only to allow sub-classing; use {@link #INSTANCE} where + * possible. + */ + protected ServerDdlExecutor() {} + + /** + * Returns the schema in which to create an object; the left part is null if the schema does not + * exist. + */ + static Pair<@Nullable CalciteSchema, String> schema( + CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) { + final String name; + final List<String> path; + if (id.isSimple()) { + path = context.getDefaultSchemaPath(); + name = id.getSimple(); + } else { + path = Util.skipLast(id.names); + name = Util.last(id.names); + } + CalciteSchema schema = + mutable ? context.getMutableRootSchema() + : context.getRootSchema(); + for (String p : path) { + @Nullable + CalciteSchema subSchema = schema.getSubSchema(p, true); + if (subSchema == null) { + return Pair.of(null, name); + } + schema = subSchema; + } + return Pair.of(schema, name); + } + + /** + * Returns the SqlValidator with the given {@code context} schema and type factory. + */ + static SqlValidator validator(CalcitePrepare.Context context, + boolean mutable) { + return new ContextSqlValidator(context, mutable); + } + + /** + * Wraps a query to rename its columns. Used by CREATE VIEW and CREATE MATERIALIZED VIEW. + */ + static SqlNode renameColumns(@Nullable SqlNodeList columnList, + SqlNode query) { + if (columnList == null) { + return query; + } + final SqlParserPos p = query.getParserPosition(); + final SqlNodeList selectList = SqlNodeList.SINGLETON_STAR; + final SqlCall from = + SqlStdOperatorTable.AS.createCall(p, + ImmutableList.<SqlNode>builder() + .add(query) + .add(new SqlIdentifier("_", p)) + .addAll(columnList) + .build()); + return new SqlSelect(p, null, selectList, from, null, null, null, null, + null, null, null, null, null); + } + + /** Erase the table date that calcite-sever created. */ + static void erase(SqlIdentifier name, CalcitePrepare.Context context) { + // Directly clearing data is more efficient than executing SQL + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, name); + final CalciteSchema calciteSchema = requireNonNull(pair.left); + final String tblName = pair.right; + final CalciteSchema.TableEntry tableEntry = + calciteSchema.getTable(tblName, context.config().caseSensitive()); + final Table table = requireNonNull(tableEntry, "tableEntry").getTable(); + if (table instanceof BaremapsMutableTable) { + BaremapsMutableTable mutableArrayTable = (BaremapsMutableTable) table; + mutableArrayTable.rows.clear(); + } else { + // Not calcite-server created, so not support truncate. + throw new UnsupportedOperationException("Only MutableArrayTable support truncate"); + } + } + + /** Populates the table called {@code name} by executing {@code query}. */ + static void populate(SqlIdentifier name, SqlNode query, + CalcitePrepare.Context context) { + // Generate, prepare and execute an "INSERT INTO table query" statement. + // (It's a bit inefficient that we convert from SqlNode to SQL and back + // again.) + final FrameworkConfig config = Frameworks.newConfigBuilder() + .defaultSchema(context.getRootSchema().plus()) + .build(); + final Planner planner = Frameworks.getPlanner(config); + try { + final StringBuilder buf = new StringBuilder(); + final SqlWriterConfig writerConfig = + SqlPrettyWriter.config().withAlwaysUseParentheses(false); + final SqlPrettyWriter w = new SqlPrettyWriter(writerConfig, buf); + buf.append("INSERT INTO "); + name.unparse(w, 0, 0); + buf.append(' '); + query.unparse(w, 0, 0); + final String sql = buf.toString(); + final SqlNode query1 = planner.parse(sql); + final SqlNode query2 = planner.validate(query1); + final RelRoot r = planner.rel(query2); + final PreparedStatement prepare = + context.getRelRunner().prepareStatement(r.rel); + int rowCount = prepare.executeUpdate(); + Util.discard(rowCount); + prepare.close(); + } catch (SqlParseException | ValidationException + | RelConversionException | SQLException e) { + throw Util.throwAsRuntime(e); + } + } + + /** + * Returns the value of a literal, converting {@link NlsString} into String. + */ + @SuppressWarnings("rawtypes") + static @Nullable Comparable value(SqlNode node) { + final Comparable v = SqlLiteral.value(node); + return v instanceof NlsString ? ((NlsString) v).getValue() : v; + } + + /** Executes a {@code CREATE FOREIGN SCHEMA} command. */ + public void execute(SqlCreateForeignSchema create, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, create.name); + requireNonNull(pair.left); // TODO: should not assume parent schema exists + if (pair.left.plus().getSubSchema(pair.right) != null) { + if (!create.getReplace() && !create.ifNotExists) { + throw SqlUtil.newContextException(create.name.getParserPosition(), + RESOURCE.schemaExists(pair.right)); + } + } + final Schema subSchema; + final String libraryName; + if (create.type != null) { + checkArgument(create.library == null); + final String typeName = (String) requireNonNull(value(create.type)); + final JsonSchema.Type type = + Util.enumVal(JsonSchema.Type.class, + typeName.toUpperCase(Locale.ROOT)); + if (type != null) { + switch (type) { + case JDBC: + libraryName = JdbcSchema.Factory.class.getName(); + break; + default: + libraryName = null; + } + } else { + libraryName = null; + } + if (libraryName == null) { + throw SqlUtil.newContextException(create.type.getParserPosition(), + RESOURCE.schemaInvalidType(typeName, + Arrays.toString(JsonSchema.Type.values()))); + } + } else { + libraryName = + requireNonNull((String) value(requireNonNull(create.library))); + } + final SchemaFactory schemaFactory = + AvaticaUtils.instantiatePlugin(SchemaFactory.class, libraryName); + final Map<String, Object> operandMap = new LinkedHashMap<>(); + for (Pair<SqlIdentifier, SqlNode> option : create.options()) { + operandMap.put(option.left.getSimple(), + requireNonNull(value(option.right))); + } + subSchema = + schemaFactory.create(pair.left.plus(), pair.right, operandMap); + pair.left.add(pair.right, subSchema); + } + + /** Executes a {@code CREATE FUNCTION} command. */ + public void execute(SqlCreateFunction create, + CalcitePrepare.Context context) { + throw new UnsupportedOperationException("CREATE FUNCTION is not supported"); + } + + /** + * Executes {@code DROP FUNCTION}, {@code DROP TABLE}, {@code DROP MATERIALIZED VIEW}, + * {@code DROP TYPE}, {@code DROP VIEW} commands. + */ + public void execute(SqlDropObject drop, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, false, drop.name); + final @Nullable CalciteSchema schema = + pair.left; // null if schema does not exist + final String objectName = pair.right; + + boolean existed; + switch (drop.getKind()) { + case DROP_TABLE: + case DROP_MATERIALIZED_VIEW: + Table materializedView = + schema != null + && drop.getKind() == SqlKind.DROP_MATERIALIZED_VIEW + ? schema.plus().getTable(objectName) + : null; + + existed = schema != null && schema.removeTable(objectName); + if (existed) { + if (materializedView instanceof Wrapper) { + ((Wrapper) materializedView).maybeUnwrap(MaterializationKey.class) + .ifPresent(materializationKey -> MaterializationService.instance() + .removeMaterialization(materializationKey)); + } + } else if (!drop.ifExists) { + throw SqlUtil.newContextException(drop.name.getParserPosition(), + RESOURCE.tableNotFound(objectName)); + } + break; + case DROP_VIEW: + // Not quite right: removes any other functions with the same name + existed = schema != null && schema.removeFunction(objectName); + if (!existed && !drop.ifExists) { + throw SqlUtil.newContextException(drop.name.getParserPosition(), + RESOURCE.viewNotFound(objectName)); + } + break; + case DROP_TYPE: + existed = schema != null && schema.removeType(objectName); + if (!existed && !drop.ifExists) { + throw SqlUtil.newContextException(drop.name.getParserPosition(), + RESOURCE.typeNotFound(objectName)); + } + break; + case DROP_FUNCTION: + existed = schema != null && schema.removeFunction(objectName); + if (!existed && !drop.ifExists) { + throw SqlUtil.newContextException(drop.name.getParserPosition(), + RESOURCE.functionNotFound(objectName)); + } + break; + case OTHER_DDL: + default: + throw new AssertionError(drop.getKind()); + } + } + + /** + * Executes a {@code TRUNCATE TABLE} command. + */ + public void execute(SqlTruncateTable truncate, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, truncate.name); + if (pair.left == null + || pair.left.plus().getTable(pair.right) == null) { + throw SqlUtil.newContextException(truncate.name.getParserPosition(), + RESOURCE.tableNotFound(pair.right)); + } + + if (!truncate.continueIdentify) { + // Calcite not support RESTART IDENTIFY + throw new UnsupportedOperationException("RESTART IDENTIFY is not supported"); + } + + erase(truncate.name, context); + } + + /** Executes a {@code CREATE MATERIALIZED VIEW} command. */ + public void execute(SqlCreateMaterializedView create, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, create.name); + if (pair.left != null + && pair.left.plus().getTable(pair.right) != null) { + // Materialized view exists. + if (!create.ifNotExists) { + // They did not specify IF NOT EXISTS, so give error. + throw SqlUtil.newContextException(create.name.getParserPosition(), + RESOURCE.tableExists(pair.right)); + } + return; + } + final SqlNode q = renameColumns(create.columnList, create.query); + final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql(); + requireNonNull(pair.left); // TODO: should not assume parent schema exists + final List<String> schemaPath = pair.left.path(null); + final ViewTableMacro viewTableMacro = + ViewTable.viewMacro(pair.left.plus(), sql, schemaPath, + context.getObjectPath(), false); + final TranslatableTable x = viewTableMacro.apply(ImmutableList.of()); + final RelDataType rowType = x.getRowType(context.getTypeFactory()); + + // Table does not exist. Create it. + final BaremapsMaterializedView table = + new BaremapsMaterializedView(pair.right, RelDataTypeImpl.proto(rowType), + NullInitializerExpressionFactory.INSTANCE, context.getTypeFactory()); + pair.left.add(pair.right, table); + populate(create.name, create.query, context); + table.key = + MaterializationService.instance().defineMaterialization(pair.left, null, + sql, schemaPath, pair.right, true, true); + } + + /** Executes a {@code CREATE SCHEMA} command. */ + public void execute(SqlCreateSchema create, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, create.name); + requireNonNull(pair.left); // TODO: should not assume parent schema exists + if (pair.left.plus().getSubSchema(pair.right) != null) { + if (create.ifNotExists) { + return; + } + if (!create.getReplace()) { + throw SqlUtil.newContextException(create.name.getParserPosition(), + RESOURCE.schemaExists(pair.right)); + } + } + final Schema subSchema = new AbstractSchema(); + pair.left.add(pair.right, subSchema); + } + + /** Executes a {@code DROP SCHEMA} command. */ + public void execute(SqlDropSchema drop, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, false, drop.name); + final String name = pair.right; + final boolean existed = pair.left != null + && pair.left.removeSubSchema(name); + if (!existed && !drop.ifExists) { + throw SqlUtil.newContextException(drop.name.getParserPosition(), + RESOURCE.schemaNotFound(name)); + } + } + + /** Executes a {@code CREATE TABLE} command. */ + public void execute(SqlCreateTable create, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, create.name); + requireNonNull(pair.left); // TODO: should not assume parent schema exists + final JavaTypeFactory typeFactory = context.getTypeFactory(); + final RelDataType queryRowType; + if (create.query != null) { + // A bit of a hack: pretend it's a view, to get its row type + final String sql = + create.query.toSqlString(CalciteSqlDialect.DEFAULT).getSql(); + final ViewTableMacro viewTableMacro = + ViewTable.viewMacro(pair.left.plus(), sql, pair.left.path(null), + context.getObjectPath(), false); + final TranslatableTable x = viewTableMacro.apply(ImmutableList.of()); + queryRowType = x.getRowType(typeFactory); + + if (create.columnList != null + && queryRowType.getFieldCount() != create.columnList.size()) { + throw SqlUtil.newContextException( + create.columnList.getParserPosition(), + RESOURCE.columnCountMismatch()); + } + } else { + queryRowType = null; + } + final List<SqlNode> columnList; + if (create.columnList != null) { + columnList = create.columnList; + } else { + if (queryRowType == null) { + // "CREATE TABLE t" is invalid; because there is no "AS query" we need + // a list of column names and types, "CREATE TABLE t (INT c)". + throw SqlUtil.newContextException(create.name.getParserPosition(), + RESOURCE.createTableRequiresColumnList()); + } + columnList = new ArrayList<>(); + for (String name : queryRowType.getFieldNames()) { + columnList.add(new SqlIdentifier(name, SqlParserPos.ZERO)); + } + } + final ImmutableList.Builder<ColumnDef> b = ImmutableList.builder(); + final RelDataTypeFactory.Builder builder = typeFactory.builder(); + final RelDataTypeFactory.Builder storedBuilder = typeFactory.builder(); + // REVIEW 2019-08-19 Danny Chan: Should we implement the + // #validate(SqlValidator) to get the SqlValidator instance? + final SqlValidator validator = validator(context, true); + for (Ord<SqlNode> c : Ord.zip(columnList)) { + if (c.e instanceof SqlColumnDeclaration) { + final SqlColumnDeclaration d = (SqlColumnDeclaration) c.e; + final RelDataType type = d.dataType.deriveType(validator, true); + builder.add(d.name.getSimple(), type); + if (d.strategy != ColumnStrategy.VIRTUAL) { + storedBuilder.add(d.name.getSimple(), type); + } + b.add(ColumnDef.of(d.expression, type, d.strategy)); + } else if (c.e instanceof SqlIdentifier) { + final SqlIdentifier id = (SqlIdentifier) c.e; + if (queryRowType == null) { + throw SqlUtil.newContextException(id.getParserPosition(), + RESOURCE.createTableRequiresColumnTypes(id.getSimple())); + } + final RelDataTypeField f = queryRowType.getFieldList().get(c.i); + final ColumnStrategy strategy = f.getType().isNullable() + ? ColumnStrategy.NULLABLE + : ColumnStrategy.NOT_NULLABLE; + b.add(ColumnDef.of(c.e, f.getType(), strategy)); + builder.add(id.getSimple(), f.getType()); + storedBuilder.add(id.getSimple(), f.getType()); + } else { + throw new AssertionError(c.e.getClass()); + } + } + final RelDataType rowType = builder.build(); + final RelDataType storedRowType = storedBuilder.build(); + final List<ColumnDef> columns = b.build(); + final InitializerExpressionFactory ief = + new NullInitializerExpressionFactory() { + @Override + public ColumnStrategy generationStrategy(RelOptTable table, + int iColumn) { + return columns.get(iColumn).strategy; + } + + @Override + public RexNode newColumnDefaultValue(RelOptTable table, + int iColumn, InitializerContext context) { + final ColumnDef c = columns.get(iColumn); + if (c.expr != null) { + // REVIEW Danny 2019-10-09: Should we support validation for DDL nodes? + final SqlNode validated = context.validateExpression(storedRowType, c.expr); + // The explicit specified type should have the same nullability + // with the column expression inferred type, + // actually they should be exactly the same. + return context.convertExpression(validated); + } + return super.newColumnDefaultValue(table, iColumn, context); + } + }; + if (pair.left.plus().getTable(pair.right) != null) { + // Table exists. + if (create.ifNotExists) { + return; + } + if (!create.getReplace()) { + // They did not specify IF NOT EXISTS, so give error. + throw SqlUtil.newContextException(create.name.getParserPosition(), + RESOURCE.tableExists(pair.right)); + } + } + // Table does not exist. Create it. + pair.left.add(pair.right, + new BaremapsMutableTable(pair.right, + RelDataTypeImpl.proto(rowType), ief, context.getTypeFactory())); + if (create.query != null) { + populate(create.name, create.query, context); + } + } + + /** Executes a {@code CREATE TABLE LIKE} command. */ + public void execute(SqlCreateTableLike create, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, create.name); + requireNonNull(pair.left); // TODO: should not assume parent schema exists + if (pair.left.plus().getTable(pair.right) != null) { + // Table exists. + if (create.ifNotExists) { + return; + } + if (!create.getReplace()) { + // They did not specify IF NOT EXISTS, so give error. + throw SqlUtil.newContextException(create.name.getParserPosition(), + RESOURCE.tableExists(pair.right)); + } + } + + final Pair<@Nullable CalciteSchema, String> sourceTablePair = + schema(context, true, create.sourceTable); + final CalciteSchema schema = + // TODO: should not assume parent schema exists + requireNonNull(sourceTablePair.left); + final String tableName = sourceTablePair.right; + final CalciteSchema.TableEntry tableEntry = + schema.getTable(tableName, context.config().caseSensitive()); + final Table table = requireNonNull(tableEntry, "tableEntry").getTable(); + + InitializerExpressionFactory ief = NullInitializerExpressionFactory.INSTANCE; + if (table instanceof Wrapper) { + final InitializerExpressionFactory sourceIef = + ((Wrapper) table).unwrap(InitializerExpressionFactory.class); + if (sourceIef != null) { + final Set<SqlCreateTableLike.LikeOption> optionSet = create.options(); + final boolean includingGenerated = + optionSet.contains(SqlCreateTableLike.LikeOption.GENERATED) + || optionSet.contains(SqlCreateTableLike.LikeOption.ALL); + final boolean includingDefaults = + optionSet.contains(SqlCreateTableLike.LikeOption.DEFAULTS) + || optionSet.contains(SqlCreateTableLike.LikeOption.ALL); + + // initializes columns based on the source table InitializerExpressionFactory + // and like options. + ief = + new CopiedTableInitializerExpressionFactory( + includingGenerated, includingDefaults, sourceIef); + } + } + + final JavaTypeFactory typeFactory = context.getTypeFactory(); + final RelDataType rowType = table.getRowType(typeFactory); + // Table does not exist. Create it. + pair.left.add(pair.right, + new BaremapsMutableTable(pair.right, + RelDataTypeImpl.proto(rowType), ief, typeFactory)); + } + + /** Executes a {@code CREATE TYPE} command. */ + public void execute(SqlCreateType create, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, create.name); + requireNonNull(pair.left); // TODO: should not assume parent schema exists + final SqlValidator validator = validator(context, false); + pair.left.add(pair.right, typeFactory -> { + if (create.dataType != null) { + return create.dataType.deriveType(validator); + } else { + final RelDataTypeFactory.Builder builder = typeFactory.builder(); + if (create.attributeDefs != null) { + for (SqlNode def : create.attributeDefs) { + final SqlAttributeDefinition attributeDef = + (SqlAttributeDefinition) def; + final SqlDataTypeSpec typeSpec = attributeDef.dataType; + final RelDataType type = typeSpec.deriveType(validator); + builder.add(attributeDef.name.getSimple(), type); + } + } + return builder.build(); + } + }); + } + + /** Executes a {@code CREATE VIEW} command. */ + public void execute(SqlCreateView create, + CalcitePrepare.Context context) { + final Pair<@Nullable CalciteSchema, String> pair = + schema(context, true, create.name); + requireNonNull(pair.left); // TODO: should not assume parent schema exists + final SchemaPlus schemaPlus = pair.left.plus(); + for (Function function : schemaPlus.getFunctions(pair.right)) { + if (function.getParameters().isEmpty()) { + if (!create.getReplace()) { + throw SqlUtil.newContextException(create.name.getParserPosition(), + RESOURCE.viewExists(pair.right)); + } + pair.left.removeFunction(pair.right); + } + } + final SqlNode q = renameColumns(create.columnList, create.query); + final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql(); + final ViewTableMacro viewTableMacro = + ViewTable.viewMacro(schemaPlus, sql, pair.left.path(null), + context.getObjectPath(), false); + final TranslatableTable x = viewTableMacro.apply(ImmutableList.of()); + Util.discard(x); + schemaPlus.add(pair.right, viewTableMacro); + } + + /** + * Initializes columns based on the source {@link InitializerExpressionFactory} and like options. + */ + private static class CopiedTableInitializerExpressionFactory + extends NullInitializerExpressionFactory { + + private final boolean includingGenerated; + private final boolean includingDefaults; + private final InitializerExpressionFactory sourceIef; + + CopiedTableInitializerExpressionFactory( + boolean includingGenerated, + boolean includingDefaults, + InitializerExpressionFactory sourceIef) { + this.includingGenerated = includingGenerated; + this.includingDefaults = includingDefaults; + this.sourceIef = sourceIef; + } + + @Override + public ColumnStrategy generationStrategy( + RelOptTable table, int iColumn) { + final ColumnStrategy sourceStrategy = sourceIef.generationStrategy(table, iColumn); + if (includingGenerated + && (sourceStrategy == ColumnStrategy.STORED + || sourceStrategy == ColumnStrategy.VIRTUAL)) { + return sourceStrategy; + } + if (includingDefaults && sourceStrategy == ColumnStrategy.DEFAULT) { + return ColumnStrategy.DEFAULT; + } + + return super.generationStrategy(table, iColumn); + } + + @Override + public RexNode newColumnDefaultValue( + RelOptTable table, int iColumn, InitializerContext context) { + if (includingDefaults || includingGenerated) { + return sourceIef.newColumnDefaultValue(table, iColumn, context); + } else { + return super.newColumnDefaultValue(table, iColumn, context); + } + } + } + + /** Column definition. */ + private static class ColumnDef { + final @Nullable SqlNode expr; + final RelDataType type; + final ColumnStrategy strategy; + + private ColumnDef(@Nullable SqlNode expr, RelDataType type, + ColumnStrategy strategy) { + this.expr = expr; + this.type = type; + this.strategy = requireNonNull(strategy, "strategy"); + checkArgument( + strategy == ColumnStrategy.NULLABLE + || strategy == ColumnStrategy.NOT_NULLABLE + || expr != null); + } + + static ColumnDef of(@Nullable SqlNode expr, RelDataType type, + ColumnStrategy strategy) { + return new ColumnDef(expr, type, strategy); + } + } +} diff --git a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsContext.java b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsContext.java new file mode 100644 index 000000000..8b94e123e --- /dev/null +++ b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsContext.java @@ -0,0 +1,73 @@ +package org.apache.baremaps.calcite; + +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.calcite.jdbc.CalcitePrepare.SparkHandler; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.tools.RelRunner; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +public class BaremapsContext implements CalcitePrepare.Context { + + private final JavaTypeFactory typeFactory; + + private final CalciteSchema rootSchema; + + private final RelRunner planner; + + public BaremapsContext(JavaTypeFactory typeFactory, CalciteSchema rootSchema, RelRunner planner) { + this.typeFactory = typeFactory; + this.rootSchema = rootSchema; + this.planner = planner; + } + + @Override + public JavaTypeFactory getTypeFactory() { + return typeFactory; + } + + @Override + public CalciteSchema getRootSchema() { + return rootSchema; + } + + @Override + public CalciteSchema getMutableRootSchema() { + return rootSchema; + } + + @Override + public List<String> getDefaultSchemaPath() { + return List.of(); + } + + @Override + public CalciteConnectionConfig config() { + return null; + } + + @Override + public SparkHandler spark() { + return null; + } + + @Override + public DataContext getDataContext() { + return null; + } + + @Override + public @Nullable List<String> getObjectPath() { + return null; + } + + @Override + public RelRunner getRelRunner() { + return planner; + } + +} diff --git a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteDdlTest.java b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteDdlTest.java new file mode 100644 index 000000000..b4d035366 --- /dev/null +++ b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteDdlTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.calcite; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.baremaps.calcite.DataColumn.Cardinality; +import org.apache.baremaps.calcite.DataColumn.Type; +import org.apache.baremaps.calcite.baremaps.BaremapsDataTable; +import org.apache.baremaps.calcite.baremaps.ServerDdlExecutor; +import org.apache.baremaps.data.collection.AppendOnlyLog; +import org.apache.baremaps.data.collection.IndexedDataList; +import org.apache.calcite.DataContext; +import org.apache.calcite.DataContexts; +import org.apache.calcite.config.Lex; +import org.apache.calcite.interpreter.Interpreter; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.*; +import org.apache.calcite.sql.ddl.SqlCreateMaterializedView; +import org.apache.calcite.sql.ddl.SqlCreateTable; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; + +public class CalciteDdlTest { + + + public class ListTable extends AbstractTable implements ScannableTable { + private final List<Integer> data; + + public ListTable(List<Integer> data) { + this.data = data; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + // Define a single column named "value" of type INTEGER + return typeFactory.builder() + .add("V", SqlTypeName.INTEGER) + .build(); + } + + @Override + public Enumerable<Object[]> scan(DataContext root) { + // Convert the List<Integer> to Enumerable<Object[]> + return Linq4j.asEnumerable(data) + .select(i -> new Object[] {i}); + } + } + + public class ListSchema extends AbstractSchema { + private final List<Integer> listA; + private final List<Integer> listB; + + public ListSchema(List<Integer> listA, List<Integer> listB) { + this.listA = listA; + this.listB = listB; + } + + @Override + protected Map<String, Table> getTableMap() { + Map<String, Table> tables = new HashMap<>(); + tables.put("LIST_A", new ListTable(listA)); + tables.put("LIST_B", new ListTable(listB)); // Initially empty + return tables; + } + } + + @Test + @Disabled + void test() throws Exception { + // Initialize your Java lists + List<Integer> listA = List.of(1, 2, 3, 4, 5); + List<Integer> listB = new ArrayList<>(); + + // Set up Calcite schema + CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); + rootSchema.add("MY_SCHEMA", new ListSchema(listA, listB)); + + // Create and add 'city' table + DataSchema cityRowType = new DataSchema("city", List.of( + new DataColumnFixed("id", Cardinality.OPTIONAL, Type.INTEGER), + new DataColumnFixed("name", Cardinality.OPTIONAL, Type.STRING), + new DataColumnFixed("geometry", Cardinality.OPTIONAL, Type.GEOMETRY))); + + DataTable cityDataTable = new BaremapsDataTable( + cityRowType, + new IndexedDataList<>(new AppendOnlyLog<>(new DataRowType(cityRowType)))); + + GeometryFactory geometryFactory = new GeometryFactory(); + cityDataTable.add(new DataRow(cityDataTable.schema(), + List.of(1, "Paris", geometryFactory.createPoint(new Coordinate(2.3522, 48.8566))))); + cityDataTable.add(new DataRow(cityDataTable.schema(), + List.of(2, "New York", geometryFactory.createPoint(new Coordinate(-74.0060, 40.7128))))); + + DataTableAdapter cityDataTableAdapter = new DataTableAdapter(cityDataTable); + rootSchema.add("CITY", cityDataTableAdapter); + + // Create and add 'population' table + DataSchema populationRowType = new DataSchema("population", List.of( + new DataColumnFixed("city_id", Cardinality.OPTIONAL, Type.INTEGER), + new DataColumnFixed("population", Cardinality.OPTIONAL, Type.INTEGER))); + + DataTable populationDataTable = new BaremapsDataTable( + populationRowType, + new IndexedDataList<>(new AppendOnlyLog<>(new DataRowType(populationRowType)))); + + populationDataTable.add(new DataRow(populationDataTable.schema(), List.of(1, 2_161_000))); + populationDataTable.add(new DataRow(populationDataTable.schema(), List.of(2, 8_336_000))); + + DataTableAdapter populationDataTableAdapter = new DataTableAdapter(populationDataTable); + rootSchema.add("population", populationDataTableAdapter); + + // Configure the parser + SqlParser.Config parserConfig = SqlParser.configBuilder() + .setLex(Lex.MYSQL) + .setParserFactory(ServerDdlExecutor.PARSER_FACTORY) + .build(); + + // Configure the framework + FrameworkConfig config = Frameworks.newConfigBuilder() + .defaultSchema(rootSchema.plus()) + .parserConfig(parserConfig) + .build(); + + // Create a planner + Planner planner = Frameworks.getPlanner(config); + + // Define the SQL query to populate list_b from list_a + String sql = "CREATE MATERIALIZED VIEW city_population AS " + + "SELECT c.id, c.name, c.geometry, p.population " + + "FROM CITY c " + + "JOIN population p ON c.id = p.city_id"; + + // Parse the SQL query + SqlNode parsed = planner.parse(sql); + + // Create a context + //BaremapsContext context = new BaremapsContext(new JavaTypeFactoryImpl(), rootSchema, Frameworks.); + + // Create an executor + //ServerDdlExecutor.INSTANCE.execute((SqlCreateMaterializedView) parsed, context); + +// // Extract the select statement from the parsed SQL query +// SqlNode select; +// if (parsed instanceof SqlCreateMaterializedView createMaterializedView) { +// List<SqlNode> operands = createMaterializedView.getOperandList(); +// select = operands.get(operands.size() - 1); +// System.out.println(select); +// } else { +// throw new IllegalArgumentException( +// "Expected a CREATE MATERIALIZED VIEW statement, but got: " + parsed.getKind()); +// } +// +// // Validate the SQL query +// SqlNode validated = planner.validate(select); +// +// // Convert the SQL query to a relational expression +// RelNode rel = planner.rel(validated).rel; +// +// try (Interpreter interpreter = new Interpreter(DataContexts.EMPTY, rel)) { +// // Create an interpreter to execute the RelNode +// for (Object[] row : interpreter) { +// listB.add((Integer) row[0]); +// } +// } +// +// // Display the results +// System.out.println("List A: " + listA); +// System.out.println("List B (after SQL): " + listB); + } +} diff --git a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java index 7f5258507..82a02d1cc 100644 --- a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java +++ b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java @@ -17,228 +17,105 @@ package org.apache.baremaps.calcite; -import java.sql.*; -import java.util.*; import org.apache.baremaps.calcite.DataColumn.Cardinality; import org.apache.baremaps.calcite.DataColumn.Type; import org.apache.baremaps.calcite.baremaps.BaremapsDataTable; +import org.apache.baremaps.calcite.baremaps.ServerDdlExecutor; import org.apache.baremaps.data.collection.AppendOnlyLog; import org.apache.baremaps.data.collection.IndexedDataList; -import org.apache.calcite.DataContext; -import org.apache.calcite.DataContexts; -import org.apache.calcite.interpreter.Interpreter; import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Linq4j; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.*; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.Planner; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.geom.GeometryFactory; -public class CalciteTest { - - @Test - @Disabled - void sql() throws SQLException { - GeometryFactory geometryFactory = new GeometryFactory(); - - // Configure Calcite connection properties - Properties info = new Properties(); - info.setProperty("lex", "MYSQL"); // Use MySQL dialect - info.setProperty("caseSensitive", "false"); // Disable case sensitivity - info.setProperty("unquotedCasing", "TO_LOWER"); // Convert unquoted identifiers to lowercase - info.setProperty("quotedCasing", "TO_LOWER"); - - try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info)) { - CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); - SchemaPlus rootSchema = calciteConnection.getRootSchema(); - - // Create and add 'city' table - DataSchema cityRowType = new DataSchema("city", List.of( - new DataColumnFixed("id", Cardinality.OPTIONAL, Type.INTEGER), - new DataColumnFixed("name", Cardinality.OPTIONAL, Type.STRING), - new DataColumnFixed("geometry", Cardinality.OPTIONAL, Type.GEOMETRY))); - - DataTable cityDataTable = new BaremapsDataTable( - cityRowType, - new IndexedDataList<>(new AppendOnlyLog<>(new DataRowType(cityRowType)))); - - cityDataTable.add(new DataRow(cityDataTable.schema(), - List.of(1, "Paris", geometryFactory.createPoint(new Coordinate(2.3522, 48.8566))))); - cityDataTable.add(new DataRow(cityDataTable.schema(), - List.of(2, "New York", geometryFactory.createPoint(new Coordinate(-74.0060, 40.7128))))); - - DataTableAdapter cityDataTableAdapter = new DataTableAdapter(cityDataTable); - rootSchema.add("city", cityDataTableAdapter); - - // Create and add 'population' table - DataSchema populationRowType = new DataSchema("population", List.of( - new DataColumnFixed("city_id", Cardinality.OPTIONAL, Type.INTEGER), - new DataColumnFixed("population", Cardinality.OPTIONAL, Type.INTEGER))); - - DataTable populationDataTable = new BaremapsDataTable( - populationRowType, - new IndexedDataList<>(new AppendOnlyLog<>(new DataRowType(populationRowType)))); - - populationDataTable.add(new DataRow(populationDataTable.schema(), List.of(1, 2_161_000))); - populationDataTable.add(new DataRow(populationDataTable.schema(), List.of(2, 8_336_000))); - - DataTableAdapter populationDataTableAdapter = new DataTableAdapter(populationDataTable); - rootSchema.add("population", populationDataTableAdapter); - - // Create view 'city_population' - String mvSql = "SELECT c.id, c.name, c.geometry, p.population " + - "FROM city c " + // lowercase and unquoted - "JOIN population p ON c.id = p.city_id"; - - ViewTableMacro materializedView = MaterializedViewTable.viewMacro( - rootSchema, - mvSql, - Collections.emptyList(), // Schema path - List.of("city_population"), // Name parts - false); // Not a materialized view - - - rootSchema.add("city_population", materializedView); - - // Debug: List all tables in the root schema - System.out.println("Available tables in the root schema:"); - for (String tableName : rootSchema.getTableNames()) { - System.out.println(" - " + tableName); - } - - String sql = "SELECT * FROM city"; - try (Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql)) { - while (resultSet.next()) { - System.out.println(resultSet.getString("id") + " " + resultSet.getString("geometry")); - } - } - - // Query the view - sql = "SELECT * FROM city_population"; - try (Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql)) { - while (resultSet.next()) { - System.out.println( - resultSet.getString("id") + " " + resultSet.getString("name")); - } - } - } - - } - - public class ListTable extends AbstractTable implements ScannableTable { - private final List<Integer> data; - - public ListTable(List<Integer> data) { - this.data = data; - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - // Define a single column named "value" of type INTEGER - return typeFactory.builder() - .add("V", SqlTypeName.INTEGER) - .build(); - } - - @Override - public Enumerable<Object[]> scan(DataContext root) { - // Convert the List<Integer> to Enumerable<Object[]> - return Linq4j.asEnumerable(data) - .select(i -> new Object[] {i}); - } - } - - public class ListSchema extends AbstractSchema { - private final List<Integer> listA; - private final List<Integer> listB; - - public ListSchema(List<Integer> listA, List<Integer> listB) { - this.listA = listA; - this.listB = listB; - } - - @Override - protected Map<String, Table> getTableMap() { - Map<String, Table> tables = new HashMap<>(); - tables.put("LIST_A", new ListTable(listA)); - tables.put("LIST_B", new ListTable(listB)); // Initially empty - return tables; - } - } - - @Test - @Disabled - void list() throws Exception { - // Initialize your Java lists - List<Integer> listA = List.of(1, 2, 3, 4, 5); - List<Integer> listB = new ArrayList<>(); - - // Set up Calcite schema - SchemaPlus rootSchema = Frameworks.createRootSchema(true); - rootSchema.add("MY_SCHEMA", new ListSchema(listA, listB)); - - // Create and add 'city' table - DataSchema cityRowType = new DataSchema("city", List.of( - new DataColumnFixed("id", Cardinality.OPTIONAL, Type.INTEGER), - new DataColumnFixed("name", Cardinality.OPTIONAL, Type.STRING), - new DataColumnFixed("geometry", Cardinality.OPTIONAL, Type.GEOMETRY))); - - DataTable cityDataTable = new BaremapsDataTable( - cityRowType, - new IndexedDataList<>(new AppendOnlyLog<>(new DataRowType(cityRowType)))); - - GeometryFactory geometryFactory = new GeometryFactory(); - cityDataTable.add(new DataRow(cityDataTable.schema(), - List.of(1, "Paris", geometryFactory.createPoint(new Coordinate(2.3522, 48.8566))))); - cityDataTable.add(new DataRow(cityDataTable.schema(), - List.of(2, "New York", geometryFactory.createPoint(new Coordinate(-74.0060, 40.7128))))); - - DataTableAdapter cityDataTableAdapter = new DataTableAdapter(cityDataTable); - rootSchema.add("CITY", cityDataTableAdapter); - - // Configure the framework - FrameworkConfig config = Frameworks.newConfigBuilder() - .defaultSchema(rootSchema.getSubSchema("MY_SCHEMA")) - .build(); - - // Create a planner - Planner planner = Frameworks.getPlanner(config); - - // Define the SQL query to populate list_b from list_a - String sql = "SELECT V * 2 AS V FROM LIST_A"; +import java.sql.*; +import java.util.List; +import java.util.Properties; - // Parse the SQL query - org.apache.calcite.sql.SqlNode parsed = planner.parse(sql); +public class CalciteTest { - // Validate the SQL query - org.apache.calcite.sql.SqlNode validated = planner.validate(parsed); + @Test + @Disabled + void sql() throws SQLException { + GeometryFactory geometryFactory = new GeometryFactory(); + + // Configure Calcite connection properties + Properties info = new Properties(); + info.setProperty("lex", "MYSQL"); // Use MySQL dialect + info.setProperty("caseSensitive", "false"); // Disable case sensitivity + info.setProperty("unquotedCasing", "TO_LOWER"); // Convert unquoted identifiers to lowercase + info.setProperty("quotedCasing", "TO_LOWER"); + info.setProperty("parserFactory", ServerDdlExecutor.class.getName() + "#PARSER_FACTORY"); + info.setProperty("materializationsEnabled", "true"); + + try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info)) { + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + + // Create and add 'city' table + DataSchema cityRowType = new DataSchema("city", List.of( + new DataColumnFixed("id", Cardinality.OPTIONAL, Type.INTEGER), + new DataColumnFixed("name", Cardinality.OPTIONAL, Type.STRING), + new DataColumnFixed("geometry", Cardinality.OPTIONAL, Type.GEOMETRY))); + + DataTable cityDataTable = new BaremapsDataTable( + cityRowType, + new IndexedDataList<>(new AppendOnlyLog<>(new DataRowType(cityRowType)))); + + cityDataTable.add(new DataRow(cityDataTable.schema(), + List.of(1, "Paris", geometryFactory.createPoint(new Coordinate(2.3522, 48.8566))))); + cityDataTable.add(new DataRow(cityDataTable.schema(), + List.of(2, "New York", geometryFactory.createPoint(new Coordinate(-74.0060, 40.7128))))); + + DataTableAdapter cityDataTableAdapter = new DataTableAdapter(cityDataTable); + rootSchema.add("city", cityDataTableAdapter); + + // Create and add 'population' table + DataSchema populationRowType = new DataSchema("population", List.of( + new DataColumnFixed("city_id", Cardinality.OPTIONAL, Type.INTEGER), + new DataColumnFixed("population", Cardinality.OPTIONAL, Type.INTEGER))); + + DataTable populationDataTable = new BaremapsDataTable( + populationRowType, + new IndexedDataList<>(new AppendOnlyLog<>(new DataRowType(populationRowType)))); + + populationDataTable.add(new DataRow(populationDataTable.schema(), List.of(1, 2_161_000))); + populationDataTable.add(new DataRow(populationDataTable.schema(), List.of(2, 8_336_000))); + + DataTableAdapter populationDataTableAdapter = new DataTableAdapter(populationDataTable); + rootSchema.add("population", populationDataTableAdapter); + + String mv = "CREATE MATERIALIZED VIEW city_population AS " + + "SELECT c.id, c.name, c.geometry, p.population " + + "FROM city c " + + "JOIN population p ON c.id = p.city_id"; + + // Execute the SQL query + try (Statement statement = connection.createStatement()) { + statement.execute(mv); + } + + // Debug: List all tables in the root schema + System.out.println("Available tables in the root schema:"); + for (String tableName : rootSchema.getTableNames()) { + System.out.println(" - " + tableName); + } + + String sql = "SELECT * FROM city_population"; + + // Execute the SQL query + try (Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + while (resultSet.next()) { + System.out.println(resultSet.getInt(1) + " " + resultSet.getString(2) + " " + resultSet.getString(3) + " " + resultSet.getInt(4)); + } + } - // Convert the SQL query to a relational expression - RelNode rel = planner.rel(validated).rel; - Interpreter interpreter = new Interpreter(DataContexts.EMPTY, rel); + } - // Create an interpreter to execute the RelNode - for (Object[] row : interpreter) { - listB.add((Integer) row[0]); } - // Display the results - System.out.println("List A: " + listA); - System.out.println("List B (after SQL): " + listB); - } } diff --git a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java index 5f12ad26c..7b2c16b58 100644 --- a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java +++ b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import org.apache.baremaps.data.util.FileUtils; @@ -39,6 +40,13 @@ public class MemoryMappedDirectory extends Memory<MappedByteBuffer> { */ public MemoryMappedDirectory(Path directory) { this(directory, 1 << 30); + if (!Files.exists(directory)) { + try { + Files.createDirectories(directory); + } catch (IOException e) { + throw new MemoryException(e); + } + } } /**
