This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch calcite-schema-ddl in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit 9c958fc93657a60c5836070151c35be1f1f5deef Author: Bertil Chapuis <[email protected]> AuthorDate: Mon Apr 14 16:08:09 2025 +0200 Add data schema for memory mapped structures --- .../apache/baremaps/calcite/data/DataSchema.java | 91 ++++++++ .../baremaps/calcite/data/DataSchemaTest.java | 243 +++++++++++++++++++++ 2 files changed, 334 insertions(+) diff --git a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/data/DataSchema.java b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/data/DataSchema.java new file mode 100644 index 000000000..35e9f4c89 --- /dev/null +++ b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/data/DataSchema.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.calcite.data; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import org.apache.baremaps.data.collection.AppendOnlyLog; +import org.apache.baremaps.data.collection.DataCollection; +import org.apache.baremaps.data.memory.Memory; +import org.apache.baremaps.data.memory.MemoryMappedDirectory; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; + +/** + * A Calcite schema implementation for data stored in directories. This schema provides access to data + * through the Apache Calcite framework for SQL querying. + */ +public class DataSchema extends AbstractSchema { + + private final File directory; + private final Map<String, Table> tableMap; + private final RelDataTypeFactory typeFactory; + + /** + * Constructs a DataSchema with the specified directory. + * + * @param directory the directory containing data subdirectories + * @param typeFactory the type factory to use for creating tables + * @throws IOException if an I/O error occurs + */ + public DataSchema(File directory, RelDataTypeFactory typeFactory) throws IOException { + this.directory = Objects.requireNonNull(directory, "Directory cannot be null"); + this.typeFactory = Objects.requireNonNull(typeFactory, "Type factory cannot be null"); + this.tableMap = new HashMap<>(); + + // Only process directories in the specified directory + File[] subdirectories = directory.listFiles(File::isDirectory); + if (subdirectories != null) { + for (File subdirectory : subdirectories) { + String tableName = subdirectory.getName(); + Path schemaPath = subdirectory.toPath().resolve("schema.json"); + + if (Files.exists(schemaPath)) { + // Read the schema from the schema.json file + try (FileInputStream fis = new FileInputStream(schemaPath.toFile())) { + DataTableSchema schema = DataTableSchema.read(fis, typeFactory); + + // Create the data collection + DataRowType dataRowType = new DataRowType(schema); + Memory<MappedByteBuffer> memory = new MemoryMappedDirectory(schemaPath.getParent()); + DataCollection<DataRow> rows = AppendOnlyLog.<DataRow>builder() + .dataType(dataRowType) + .memory(memory) + .build(); + + // Create the table + tableMap.put(tableName, new DataModifiableTable(tableName, schema, rows, typeFactory)); + } + } + } + } + } + + @Override + protected Map<String, Table> getTableMap() { + return tableMap; + } +} \ No newline at end of file diff --git a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/data/DataSchemaTest.java b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/data/DataSchemaTest.java new file mode 100644 index 000000000..89405d6dc --- /dev/null +++ b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/data/DataSchemaTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.calcite.data; + +import static org.junit.jupiter.api.Assertions.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.baremaps.testing.TestFiles; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class DataSchemaTest { + + @TempDir + Path tempDir; + + private File sampleDataDir; + private File citiesDir; + private File countriesDir; + private RelDataTypeFactory typeFactory; + + @BeforeEach + void setup() throws IOException, SQLException { + // Create the test directory structure + sampleDataDir = tempDir.resolve("data").toFile(); + citiesDir = new File(sampleDataDir, "cities"); + countriesDir = new File(sampleDataDir, "countries"); + + sampleDataDir.mkdirs(); + citiesDir.mkdirs(); + countriesDir.mkdirs(); + + // Create schema files + createCitiesSchema(); + createCountriesSchema(); + + // Initialize the type factory + Properties props = new Properties(); + props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "false"); + CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props); + + // Create a connection to get the type factory + try (Connection connection = DriverManager.getConnection("jdbc:calcite:", props)) { + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + typeFactory = calciteConnection.getTypeFactory(); + } + } + + private void createCitiesSchema() throws IOException { + // Create a schema for cities + Map<String, Object> schemaMap = new HashMap<>(); + schemaMap.put("name", "cities"); + + // Define columns + Map<String, Object>[] columns = new Map[3]; + + // city column + Map<String, Object> cityColumn = new HashMap<>(); + cityColumn.put("name", "city"); + cityColumn.put("cardinality", "REQUIRED"); + cityColumn.put("sqlTypeName", "VARCHAR"); + columns[0] = cityColumn; + + // country column + Map<String, Object> countryColumn = new HashMap<>(); + countryColumn.put("name", "country"); + countryColumn.put("cardinality", "REQUIRED"); + countryColumn.put("sqlTypeName", "VARCHAR"); + columns[1] = countryColumn; + + // population column + Map<String, Object> populationColumn = new HashMap<>(); + populationColumn.put("name", "population"); + populationColumn.put("cardinality", "REQUIRED"); + populationColumn.put("sqlTypeName", "INTEGER"); + columns[2] = populationColumn; + + schemaMap.put("columns", columns); + + // Write schema to file + ObjectMapper mapper = new ObjectMapper(); + try (FileOutputStream fos = new FileOutputStream(new File(citiesDir, "schema.json"))) { + mapper.writeValue(fos, schemaMap); + } + } + + private void createCountriesSchema() throws IOException { + // Create a schema for countries + Map<String, Object> schemaMap = new HashMap<>(); + schemaMap.put("name", "countries"); + + // Define columns + Map<String, Object>[] columns = new Map[3]; + + // country column + Map<String, Object> countryColumn = new HashMap<>(); + countryColumn.put("name", "country"); + countryColumn.put("cardinality", "REQUIRED"); + countryColumn.put("sqlTypeName", "VARCHAR"); + columns[0] = countryColumn; + + // continent column + Map<String, Object> continentColumn = new HashMap<>(); + continentColumn.put("name", "continent"); + continentColumn.put("cardinality", "REQUIRED"); + continentColumn.put("sqlTypeName", "VARCHAR"); + columns[1] = continentColumn; + + // population column + Map<String, Object> populationColumn = new HashMap<>(); + populationColumn.put("name", "population"); + populationColumn.put("cardinality", "REQUIRED"); + populationColumn.put("sqlTypeName", "INTEGER"); + columns[2] = populationColumn; + + schemaMap.put("columns", columns); + + // Write schema to file + ObjectMapper mapper = new ObjectMapper(); + try (FileOutputStream fos = new FileOutputStream(new File(countriesDir, "schema.json"))) { + mapper.writeValue(fos, schemaMap); + } + } + + @Test + void testSchemaCreation() throws IOException { + // Create a DataSchema instance + DataSchema schema = new DataSchema(sampleDataDir, typeFactory); + + // Get the table map + Map<String, Table> tableMap = schema.getTableMap(); + + // Verify that the schema has tables + assertNotNull(tableMap); + assertFalse(tableMap.isEmpty(), "Schema should have at least one table"); + + // Verify that both test tables exist + assertTrue(tableMap.containsKey("cities"), "Schema should contain the 'cities' table"); + assertTrue(tableMap.containsKey("countries"), "Schema should contain the 'countries' table"); + assertNotNull(tableMap.get("cities"), "Cities table should not be null"); + assertNotNull(tableMap.get("countries"), "Countries table should not be null"); + } + + @Test + void testSqlQueryWithSchema() throws Exception { + // Create a DataSchema instance + DataSchema schema = new DataSchema(sampleDataDir, typeFactory); + + // Configure Calcite connection properties + Properties info = new Properties(); + info.setProperty("lex", "MYSQL"); + + // Set up a connection and register our schema + try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info)) { + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + + // Register the schema + rootSchema.add("data", schema); + + // Execute a simple query + try (Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery( + "SELECT * FROM data.cities WHERE country = 'France'")) { + + // Since we don't have actual data in the tables, we just verify the query executes + // In a real test, we would add data to the tables and verify the results + assertNotNull(resultSet, "ResultSet should not be null"); + } + } + } + + @Test + void testJoinQuery() throws Exception { + // Create a DataSchema instance + DataSchema schema = new DataSchema(sampleDataDir, typeFactory); + + // Configure Calcite connection properties + Properties info = new Properties(); + info.setProperty("lex", "MYSQL"); + + // Set up a connection and register our schema + try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info)) { + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + + // Register the schema + rootSchema.add("data", schema); + + // Execute a join query + try (Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery( + "SELECT c.city, c.country, co.continent " + + "FROM data.cities c " + + "JOIN data.countries co ON c.country = co.country " + + "WHERE co.continent = 'Europe'")) { + + // Since we don't have actual data in the tables, we just verify the query executes + // In a real test, we would add data to the tables and verify the results + assertNotNull(resultSet, "ResultSet should not be null"); + } + } + } +} \ No newline at end of file
