This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch calcite-schema-ddl
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit 9c958fc93657a60c5836070151c35be1f1f5deef
Author: Bertil Chapuis <[email protected]>
AuthorDate: Mon Apr 14 16:08:09 2025 +0200

    Add data schema for memory mapped structures
---
 .../apache/baremaps/calcite/data/DataSchema.java   |  91 ++++++++
 .../baremaps/calcite/data/DataSchemaTest.java      | 243 +++++++++++++++++++++
 2 files changed, 334 insertions(+)

diff --git 
a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/data/DataSchema.java
 
b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/data/DataSchema.java
new file mode 100644
index 000000000..35e9f4c89
--- /dev/null
+++ 
b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/data/DataSchema.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.baremaps.calcite.data;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.baremaps.data.collection.AppendOnlyLog;
+import org.apache.baremaps.data.collection.DataCollection;
+import org.apache.baremaps.data.memory.Memory;
+import org.apache.baremaps.data.memory.MemoryMappedDirectory;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+
+/**
+ * A Calcite schema implementation for data stored in directories. This schema 
provides access to data
+ * through the Apache Calcite framework for SQL querying.
+ */
+public class DataSchema extends AbstractSchema {
+
+  private final File directory;
+  private final Map<String, Table> tableMap;
+  private final RelDataTypeFactory typeFactory;
+
+  /**
+   * Constructs a DataSchema with the specified directory.
+   *
+   * @param directory the directory containing data subdirectories
+   * @param typeFactory the type factory to use for creating tables
+   * @throws IOException if an I/O error occurs
+   */
+  public DataSchema(File directory, RelDataTypeFactory typeFactory) throws 
IOException {
+    this.directory = Objects.requireNonNull(directory, "Directory cannot be 
null");
+    this.typeFactory = Objects.requireNonNull(typeFactory, "Type factory 
cannot be null");
+    this.tableMap = new HashMap<>();
+
+    // Only process directories in the specified directory
+    File[] subdirectories = directory.listFiles(File::isDirectory);
+    if (subdirectories != null) {
+      for (File subdirectory : subdirectories) {
+        String tableName = subdirectory.getName();
+        Path schemaPath = subdirectory.toPath().resolve("schema.json");
+        
+        if (Files.exists(schemaPath)) {
+          // Read the schema from the schema.json file
+          try (FileInputStream fis = new FileInputStream(schemaPath.toFile())) 
{
+            DataTableSchema schema = DataTableSchema.read(fis, typeFactory);
+            
+            // Create the data collection
+            DataRowType dataRowType = new DataRowType(schema);
+            Memory<MappedByteBuffer> memory = new 
MemoryMappedDirectory(schemaPath.getParent());
+            DataCollection<DataRow> rows = AppendOnlyLog.<DataRow>builder()
+                .dataType(dataRowType)
+                .memory(memory)
+                .build();
+            
+            // Create the table
+            tableMap.put(tableName, new DataModifiableTable(tableName, schema, 
rows, typeFactory));
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Map<String, Table> getTableMap() {
+    return tableMap;
+  }
+} 
\ No newline at end of file
diff --git 
a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/data/DataSchemaTest.java
 
b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/data/DataSchemaTest.java
new file mode 100644
index 000000000..89405d6dc
--- /dev/null
+++ 
b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/data/DataSchemaTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.baremaps.calcite.data;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.baremaps.testing.TestFiles;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class DataSchemaTest {
+
+  @TempDir
+  Path tempDir;
+  
+  private File sampleDataDir;
+  private File citiesDir;
+  private File countriesDir;
+  private RelDataTypeFactory typeFactory;
+
+  @BeforeEach
+  void setup() throws IOException, SQLException {
+    // Create the test directory structure
+    sampleDataDir = tempDir.resolve("data").toFile();
+    citiesDir = new File(sampleDataDir, "cities");
+    countriesDir = new File(sampleDataDir, "countries");
+    
+    sampleDataDir.mkdirs();
+    citiesDir.mkdirs();
+    countriesDir.mkdirs();
+    
+    // Create schema files
+    createCitiesSchema();
+    createCountriesSchema();
+    
+    // Initialize the type factory
+    Properties props = new Properties();
+    props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), 
"false");
+    CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props);
+    
+    // Create a connection to get the type factory
+    try (Connection connection = DriverManager.getConnection("jdbc:calcite:", 
props)) {
+      CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
+      typeFactory = calciteConnection.getTypeFactory();
+    }
+  }
+  
+  private void createCitiesSchema() throws IOException {
+    // Create a schema for cities
+    Map<String, Object> schemaMap = new HashMap<>();
+    schemaMap.put("name", "cities");
+    
+    // Define columns
+    Map<String, Object>[] columns = new Map[3];
+    
+    // city column
+    Map<String, Object> cityColumn = new HashMap<>();
+    cityColumn.put("name", "city");
+    cityColumn.put("cardinality", "REQUIRED");
+    cityColumn.put("sqlTypeName", "VARCHAR");
+    columns[0] = cityColumn;
+    
+    // country column
+    Map<String, Object> countryColumn = new HashMap<>();
+    countryColumn.put("name", "country");
+    countryColumn.put("cardinality", "REQUIRED");
+    countryColumn.put("sqlTypeName", "VARCHAR");
+    columns[1] = countryColumn;
+    
+    // population column
+    Map<String, Object> populationColumn = new HashMap<>();
+    populationColumn.put("name", "population");
+    populationColumn.put("cardinality", "REQUIRED");
+    populationColumn.put("sqlTypeName", "INTEGER");
+    columns[2] = populationColumn;
+    
+    schemaMap.put("columns", columns);
+    
+    // Write schema to file
+    ObjectMapper mapper = new ObjectMapper();
+    try (FileOutputStream fos = new FileOutputStream(new File(citiesDir, 
"schema.json"))) {
+      mapper.writeValue(fos, schemaMap);
+    }
+  }
+  
+  private void createCountriesSchema() throws IOException {
+    // Create a schema for countries
+    Map<String, Object> schemaMap = new HashMap<>();
+    schemaMap.put("name", "countries");
+    
+    // Define columns
+    Map<String, Object>[] columns = new Map[3];
+    
+    // country column
+    Map<String, Object> countryColumn = new HashMap<>();
+    countryColumn.put("name", "country");
+    countryColumn.put("cardinality", "REQUIRED");
+    countryColumn.put("sqlTypeName", "VARCHAR");
+    columns[0] = countryColumn;
+    
+    // continent column
+    Map<String, Object> continentColumn = new HashMap<>();
+    continentColumn.put("name", "continent");
+    continentColumn.put("cardinality", "REQUIRED");
+    continentColumn.put("sqlTypeName", "VARCHAR");
+    columns[1] = continentColumn;
+    
+    // population column
+    Map<String, Object> populationColumn = new HashMap<>();
+    populationColumn.put("name", "population");
+    populationColumn.put("cardinality", "REQUIRED");
+    populationColumn.put("sqlTypeName", "INTEGER");
+    columns[2] = populationColumn;
+    
+    schemaMap.put("columns", columns);
+    
+    // Write schema to file
+    ObjectMapper mapper = new ObjectMapper();
+    try (FileOutputStream fos = new FileOutputStream(new File(countriesDir, 
"schema.json"))) {
+      mapper.writeValue(fos, schemaMap);
+    }
+  }
+
+  @Test
+  void testSchemaCreation() throws IOException {
+    // Create a DataSchema instance
+    DataSchema schema = new DataSchema(sampleDataDir, typeFactory);
+
+    // Get the table map
+    Map<String, Table> tableMap = schema.getTableMap();
+
+    // Verify that the schema has tables
+    assertNotNull(tableMap);
+    assertFalse(tableMap.isEmpty(), "Schema should have at least one table");
+
+    // Verify that both test tables exist
+    assertTrue(tableMap.containsKey("cities"), "Schema should contain the 
'cities' table");
+    assertTrue(tableMap.containsKey("countries"), "Schema should contain the 
'countries' table");
+    assertNotNull(tableMap.get("cities"), "Cities table should not be null");
+    assertNotNull(tableMap.get("countries"), "Countries table should not be 
null");
+  }
+
+  @Test
+  void testSqlQueryWithSchema() throws Exception {
+    // Create a DataSchema instance
+    DataSchema schema = new DataSchema(sampleDataDir, typeFactory);
+
+    // Configure Calcite connection properties
+    Properties info = new Properties();
+    info.setProperty("lex", "MYSQL");
+
+    // Set up a connection and register our schema
+    try (Connection connection = DriverManager.getConnection("jdbc:calcite:", 
info)) {
+      CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
+      SchemaPlus rootSchema = calciteConnection.getRootSchema();
+
+      // Register the schema
+      rootSchema.add("data", schema);
+
+      // Execute a simple query
+      try (Statement statement = connection.createStatement();
+          ResultSet resultSet = statement.executeQuery(
+              "SELECT * FROM data.cities WHERE country = 'France'")) {
+
+        // Since we don't have actual data in the tables, we just verify the 
query executes
+        // In a real test, we would add data to the tables and verify the 
results
+        assertNotNull(resultSet, "ResultSet should not be null");
+      }
+    }
+  }
+
+  @Test
+  void testJoinQuery() throws Exception {
+    // Create a DataSchema instance
+    DataSchema schema = new DataSchema(sampleDataDir, typeFactory);
+
+    // Configure Calcite connection properties
+    Properties info = new Properties();
+    info.setProperty("lex", "MYSQL");
+
+    // Set up a connection and register our schema
+    try (Connection connection = DriverManager.getConnection("jdbc:calcite:", 
info)) {
+      CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
+      SchemaPlus rootSchema = calciteConnection.getRootSchema();
+
+      // Register the schema
+      rootSchema.add("data", schema);
+
+      // Execute a join query
+      try (Statement statement = connection.createStatement();
+          ResultSet resultSet = statement.executeQuery(
+              "SELECT c.city, c.country, co.continent "
+                  + "FROM data.cities c "
+                  + "JOIN data.countries co ON c.country = co.country "
+                  + "WHERE co.continent = 'Europe'")) {
+
+        // Since we don't have actual data in the tables, we just verify the 
query executes
+        // In a real test, we would add data to the tables and verify the 
results
+        assertNotNull(resultSet, "ResultSet should not be null");
+      }
+    }
+  }
+} 
\ No newline at end of file

Reply via email to