This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch calcite-framework
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit 251ea904ca7f4a272d05938d7db49f54da2edd65
Author: Bertil Chapuis <[email protected]>
AuthorDate: Sun Jan 26 11:43:54 2025 +0100

    Add header to memory
---
 .../baremaps/calcite/BaremapsTableFactory.java     |  91 ++++--
 .../baremaps/calcite/BaremapsTableFactoryTest.java | 118 +++++++
 .../apache/baremaps/calcite/CalciteTest.java       |  51 +--
 .../java/org/apache/baremaps/csv/CsvDataStore.java |   6 +-
 .../java/org/apache/baremaps/csv/CsvDataTable.java | 343 ++++++++++-----------
 .../baremaps/csv/CsvDataTableGeonamesTest.java     |   2 +-
 .../org/apache/baremaps/csv/CsvDataTableTest.java  |  15 +-
 .../baremaps/data/collection/AppendOnlyLog.java    |   7 +-
 .../org/apache/baremaps/data/memory/Memory.java    |  57 +++-
 .../data/memory/MemoryMappedDirectory.java         |  21 +-
 .../baremaps/data/memory/MemoryMappedFile.java     |  21 +-
 .../apache/baremaps/data/memory/OffHeapMemory.java |  12 +-
 .../apache/baremaps/data/memory/OnHeapMemory.java  |  12 +-
 .../java/org/apache/baremaps/store/DataSchema.java |  19 +-
 14 files changed, 476 insertions(+), 299 deletions(-)

diff --git 
a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java
 
b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java
index bd0c99b31..e53de427b 100644
--- 
a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java
+++ 
b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java
@@ -17,37 +17,82 @@
 
 package org.apache.baremaps.calcite;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
 import org.apache.baremaps.csv.CsvDataTable;
+import org.apache.baremaps.data.collection.AppendOnlyLog;
+import org.apache.baremaps.data.collection.DataCollection;
+import org.apache.baremaps.data.collection.IndexedDataList;
+import org.apache.baremaps.data.memory.Memory;
+import org.apache.baremaps.data.memory.MemoryMappedDirectory;
+import org.apache.baremaps.data.store.DataTableImpl;
+import org.apache.baremaps.data.type.DataType;
+import org.apache.baremaps.data.type.DataTypeImpl;
+import org.apache.baremaps.store.DataRow;
+import org.apache.baremaps.store.DataSchema;
+import org.apache.baremaps.store.DataTable;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeImpl;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.TableFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+
 public class BaremapsTableFactory implements TableFactory<BaremapsTable> {
 
-  public BaremapsTableFactory() {
-
-  }
-
-  @Override
-  public BaremapsTable create(SchemaPlus schema, String name,
-      Map<String, Object> operand, RelDataType rowType) {
-    final RelProtoDataType protoRowType =
-        rowType != null ? RelDataTypeImpl.proto(rowType) : null;
-    String file = (String) operand.get("file");
-    if (file != null && file.endsWith(".csv")) {
-      try {
-        return new BaremapsTable(
-            new CsvDataTable(new File(file), true),
-            protoRowType);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+    public BaremapsTableFactory() {
+
     }
-    return new BaremapsTable(null, protoRowType);
-  }
+
+    @Override
+    public BaremapsTable create(
+            SchemaPlus schema,
+            String name,
+            Map<String, Object> operand,
+            RelDataType rowType) {
+        final RelProtoDataType protoRowType =
+                rowType != null ? RelDataTypeImpl.proto(rowType) : null;
+        String format = (String) operand.get("format");
+        DataTable dataTable = switch (format) {
+            case "mmap" -> createMMapTable(schema, name, operand, rowType);
+            case "csv" -> createCsvTable(schema, name, operand, rowType);
+            default -> throw new RuntimeException("Unsupported format");
+        };
+        return new BaremapsTable(dataTable, protoRowType);
+    }
+
+    private DataTable createMMapTable(SchemaPlus schema, String name, 
Map<String, Object> operand, RelDataType rowType) {
+        String directory = (String) operand.get("directory");
+        if (directory == null) {
+            throw new RuntimeException("A directory should be specified");
+        }
+        try {
+            Path directoryPath = Paths.get(directory);
+            Path schemaFile = directoryPath.resolve("schema.json");
+            DataSchema dataSchema = DataSchema.read(schemaFile);
+            DataType<DataRow> dataType = new DataTypeImpl(dataSchema);
+            Memory<MappedByteBuffer> memory = new 
MemoryMappedDirectory(directoryPath);
+            DataCollection<DataRow> dataCollection = new 
AppendOnlyLog<>(dataType, memory);
+            return new DataTableImpl(dataSchema, dataCollection);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private DataTable createCsvTable(SchemaPlus schema, String name, 
Map<String, Object> operand, RelDataType rowType) {
+        String file = (String) operand.get("file");
+        if (file == null) {
+            throw new RuntimeException("A file should be specified");
+        }
+        try {
+            return new CsvDataTable(name, new File(file), true, ',');
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 }
diff --git 
a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsTableFactoryTest.java
 
b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsTableFactoryTest.java
new file mode 100644
index 000000000..1598163e7
--- /dev/null
+++ 
b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsTableFactoryTest.java
@@ -0,0 +1,118 @@
+package org.apache.baremaps.calcite;
+
+import org.apache.baremaps.data.collection.AppendOnlyLog;
+import org.apache.baremaps.data.memory.MemoryMappedDirectory;
+import org.apache.baremaps.data.store.DataTableImpl;
+import org.apache.baremaps.data.type.DataTypeImpl;
+import org.apache.baremaps.data.util.FileUtils;
+import org.apache.baremaps.store.*;
+import org.apache.baremaps.store.DataColumn.Cardinality;
+import org.apache.baremaps.store.DataColumn.ColumnType;
+import org.junit.jupiter.api.Test;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.GeometryFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.List;
+
+class BaremapsTableFactoryTest {
+
+
+    @Test
+    public void createCsvTable() throws Exception {
+        File file = File.createTempFile("test", ".csv");
+        String csv = """
+        ID,NAME,GEOM
+        1,Paris,POINT(2.3522 48.8566)
+        2,New York,POINT(-74.0060 40.7128)
+        """;
+        try (PrintWriter writer = new PrintWriter(file)) {
+            writer.write(csv);
+        }
+        String model = """
+        {
+            version: '1.0',
+            defaultSchema: 'TEST',
+            schemas: [
+                {
+                name: 'TEST',
+                tables: [
+                    {
+                      name: 'TEST',
+                      factory: 
'org.apache.baremaps.calcite.BaremapsTableFactory',
+                      operand: {
+                          format: 'csv',
+                          file: '%s'
+                      }
+                    }
+                ]
+              }
+            ]
+        }
+        """.formatted(file.getAbsolutePath());
+        try (Connection connection =
+                     DriverManager.getConnection("jdbc:calcite:model=inline:" 
+ model)) {
+
+            ResultSet resultSet = 
connection.createStatement().executeQuery("SELECT * FROM TEST.TEST");
+            while (resultSet.next()) {
+                System.out.println(resultSet.getString("ID") + " " + 
resultSet.getString("GEOM"));
+            }
+        } finally {
+            file.delete();
+        }
+    }
+
+    @Test
+    public void createMMapTable() throws Exception {
+        Path path = Files.createTempDirectory("temp");
+
+        DataSchema dataSchema = new DataSchemaImpl("test", List.of(
+                new DataColumnFixed("id", Cardinality.REQUIRED, 
ColumnType.INTEGER),
+                new DataColumnFixed("name", Cardinality.REQUIRED, 
ColumnType.STRING),
+                new DataColumnFixed("geom", Cardinality.REQUIRED, 
ColumnType.GEOMETRY)
+        ));
+
+        DataTable dataTable = new DataTableImpl(dataSchema, new 
AppendOnlyLog<>(new DataTypeImpl(dataSchema), new MemoryMappedDirectory(path)));
+        dataTable.add(new DataRowImpl(dataSchema, List.of(1, "a", new 
GeometryFactory().createPoint(new Coordinate(1, 1)))));
+        dataTable.add(new DataRowImpl(dataSchema, List.of(2, "b", new 
GeometryFactory().createPoint(new Coordinate(2, 2)))));
+        dataTable.close();
+
+        String model = """
+        {
+            version: '1.0',
+            defaultSchema: 'TEST',
+            schemas: [
+                {
+                name: 'TEST',
+                tables: [
+                    {
+                      name: 'TEST',
+                      factory: 
'org.apache.baremaps.calcite.BaremapsTableFactory',
+                      operand: {
+                          format: 'mmap',
+                          directory: '%s'
+                      }
+                    }
+                ]
+              }
+            ]
+        }
+        """.formatted(path.toAbsolutePath());
+        try (Connection connection =
+                     DriverManager.getConnection("jdbc:calcite:model=inline:" 
+ model)) {
+
+            ResultSet resultSet = 
connection.createStatement().executeQuery("SELECT * FROM TEST.TEST");
+            while (resultSet.next()) {
+                System.out.println(resultSet.getString("ID") + " " + 
resultSet.getString("GEOM"));
+            }
+        } finally {
+            FileUtils.deleteRecursively(path);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/baremaps-calcite/src/test/java/com/apache/baremaps/calcite/CalciteTest.java 
b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java
similarity index 86%
rename from 
baremaps-calcite/src/test/java/com/apache/baremaps/calcite/CalciteTest.java
rename to 
baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java
index 870263c8f..b68d2261c 100644
--- 
a/baremaps-calcite/src/test/java/com/apache/baremaps/calcite/CalciteTest.java
+++ 
b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java
@@ -15,13 +15,11 @@
  * limitations under the License.
  */
 
-package com.apache.baremaps.calcite;
+package org.apache.baremaps.calcite;
 
-import java.io.File;
-import java.io.PrintWriter;
 import java.sql.*;
 import java.util.*;
-import org.apache.baremaps.calcite.BaremapsTable;
+
 import org.apache.baremaps.data.collection.AppendOnlyLog;
 import org.apache.baremaps.data.collection.IndexedDataList;
 import org.apache.baremaps.data.store.DataTableImpl;
@@ -247,50 +245,5 @@ public class CalciteTest {
     System.out.println("List B (after SQL): " + listB);
   }
 
-  @Test
-  public void testCsvStream() throws Exception {
-    File file = File.createTempFile("test", ".csv");
-    String csv = """
-        ID,NAME,GEOM
-        1,Paris,POINT(2.3522 48.8566)
-        2,New York,POINT(-74.0060 40.7128)
-        """;
-    try (PrintWriter writer = new PrintWriter(file)) {
-      writer.write(csv);
-    }
-
-    String model = """
-        {
-            version: '1.0',
-            defaultSchema: 'TEST',
-            schemas: [
-                {
-                name: 'TEST',
-                tables: [
-                    {
-                      name: 'TEST',
-                      type: 'custom',
-                      factory: 
'org.apache.baremaps.calcite.BaremapsTableFactory',
-                      operand: {
-                          file: '%s'
-                      }
-                    }
-                ]
-              }
-            ]
-        }
-        """.formatted(file.getAbsolutePath());
-
-    try (Connection connection =
-        DriverManager.getConnection("jdbc:calcite:model=inline:" + model)) {
-
-      ResultSet resultSet = connection.createStatement().executeQuery("SELECT 
* FROM TEST.TEST");
-      while (resultSet.next()) {
-        System.out.println(resultSet.getString("ID") + " " + 
resultSet.getString("GEOM"));
-      }
-    } finally {
-      file.delete();
-    }
-  }
 
 }
diff --git 
a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java 
b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java
index 74abb32e3..9bfc41aa2 100644
--- a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java
+++ b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java
@@ -41,15 +41,15 @@ public class CsvDataStore implements DataStore {
    * @param tableName the name of the table
    * @param schema the data schema defining the structure
    * @param csvFile the CSV file to read data from
-   * @param hasHeader whether the CSV file includes a header row
+   * @param header whether the CSV file includes a header row
    * @param separator the character used to separate columns in the CSV file
    * @throws IOException if an I/O error occurs
    */
-  public CsvDataStore(String tableName, DataSchema schema, File csvFile, 
boolean hasHeader,
+  public CsvDataStore(String tableName, DataSchema schema, File csvFile, 
boolean header,
       char separator) throws IOException {
     this.tableName = tableName;
     this.schema = schema;
-    this.dataTable = new CsvDataTable(csvFile, hasHeader);
+    this.dataTable = new CsvDataTable(tableName, csvFile, header, separator);
   }
 
   /**
diff --git 
a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java 
b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java
index 360debfae..03283ec8a 100644
--- a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java
+++ b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java
@@ -17,208 +17,195 @@
 
 package org.apache.baremaps.csv;
 
+import com.fasterxml.jackson.core.FormatSchema;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.MappingIterator;
 import com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.baremaps.store.*;
+import org.apache.baremaps.store.DataColumn.Cardinality;
+import org.apache.baremaps.store.DataColumn.ColumnType;
+import org.locationtech.jts.io.WKTReader;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
-import org.apache.baremaps.store.*;
-import org.apache.baremaps.store.DataColumn.Cardinality;
-import org.apache.baremaps.store.DataColumn.ColumnType;
-import org.locationtech.jts.io.WKTReader;
 
 /**
  * A DataTable implementation that reads data from a CSV file using Jackson.
  */
 public class CsvDataTable implements DataTable {
 
-  private final File csvFile;
-  private final CsvSchema csvSchema;
-  private final DataSchema dataSchema;
-
-  private final long size;
-  private JsonParser parser;
-
-  /**
-   * Constructs a CsvDataTable with the specified schema, CSV file, header 
presence, and separator.
-   *
-   * @param csvFile the CSV file to read data from
-   * @param hasHeader whether the CSV file includes a header row
-   * @throws IOException if an I/O error occurs
-   */
-  public CsvDataTable(File csvFile, boolean hasHeader) throws IOException {
-    this.csvFile = csvFile;
-    this.csvSchema = inferCsvSchema(csvFile);
-    this.dataSchema = createDataSchema(csvFile.getName(), csvSchema);
-    this.size = calculateSize();
-  }
-
-
-  private CsvSchema inferCsvSchema(File csvFile) {
-    CsvSchema csvSchema = 
CsvSchema.emptySchema().withUseHeader(true).withColumnSeparator(',');
-    try (var ignored = new CsvMapper().readerFor(Map.class)
-        .with(csvSchema)
-        .createParser(csvFile)) {
-      var test = ignored.readValueAsTree();
-      return csvSchema;
-    } catch (IOException e) {
-      throw new DataStoreException("Error reading CSV file", e);
+    private final File file;
+    private final CsvSchema csvSchema;
+    private final DataSchema dataSchema;
+
+    private final long size;
+    private MappingIterator<Map<String, String>> csvIterator;
+
+    /**
+     * Constructs a CsvDataTable with the specified schema, CSV file, header 
presence, and separator.
+     *
+     * @param name      the name of the table
+     * @param csvFile   the CSV file to read data from
+     * @param header    whether the CSV file includes a header row
+     * @param separator the separator used in the CSV file
+     * @throws IOException if an I/O error occurs
+     */
+    public CsvDataTable(String name, File csvFile, boolean header, char 
separator) throws IOException {
+        this.file = csvFile;
+
+        // Iterate over all records to infer the csv schema and calculate the 
size of the table
+        CsvSchema csvSchema = CsvSchema.emptySchema()
+                .withUseHeader(header)
+                .withColumnSeparator(separator);
+
+        try (MappingIterator<Object> iterator = new CsvMapper()
+                .readerFor(Object.class)
+                .with(csvSchema)
+                .readValues(csvFile)) {
+
+            int count = 0;
+            while (iterator.hasNext()) {
+                iterator.next();
+                count++;
+            }
+
+            this.csvSchema = (CsvSchema) iterator.getParserSchema();
+            this.size = count;
+        } catch (IOException e) {
+            throw new DataStoreException("Error reading CSV file", e);
+        }
+
+        // Map the csv schema to a data schema
+        List<DataColumn> columns = new ArrayList<>();
+        for (String columnName : this.csvSchema.getColumnNames()) {
+            switch (this.csvSchema.column(columnName).getType()) {
+                case STRING -> columns
+                        .add(new DataColumnFixed(columnName, 
Cardinality.REQUIRED, ColumnType.STRING));
+                case NUMBER -> columns
+                        .add(new DataColumnFixed(columnName, 
Cardinality.REQUIRED, ColumnType.DOUBLE));
+                case BOOLEAN -> columns
+                        .add(new DataColumnFixed(columnName, 
Cardinality.REQUIRED, ColumnType.BOOLEAN));
+                case ARRAY -> columns
+                        .add(new DataColumnFixed(columnName, 
Cardinality.REPEATED, ColumnType.STRING));
+                default -> throw new IllegalArgumentException(
+                        "Unsupported column type: " + 
csvSchema.column(columnName).getType());
+            }
+        }
+        this.dataSchema = new DataSchemaImpl(name, columns);
     }
-  }
-
-  private DataSchema createDataSchema(String name, CsvSchema csvSchema) {
-    List<DataColumn> columns = new ArrayList<>();
-    for (String columnName : csvSchema.getColumnNames()) {
-      switch (csvSchema.column(columnName).getType()) {
-        case STRING -> columns
-            .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, 
ColumnType.STRING));
-        case NUMBER -> columns
-            .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, 
ColumnType.DOUBLE));
-        case BOOLEAN -> columns
-            .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, 
ColumnType.BOOLEAN));
-        case ARRAY -> columns
-            .add(new DataColumnFixed(columnName, Cardinality.REPEATED, 
ColumnType.STRING));
-        default -> throw new IllegalArgumentException(
-            "Unsupported column type: " + 
csvSchema.column(columnName).getType());
-      }
+
+    @Override
+    public DataSchema schema() {
+        return dataSchema;
     }
-    return new DataSchemaImpl(name, columns);
-  }
-
-  /**
-   * Calculates the number of rows in the CSV file.
-   *
-   * @return the number of rows
-   * @throws IOException if an I/O error occurs
-   */
-  private long calculateSize() throws IOException {
-    try (var parser = new CsvMapper().readerFor(Map.class)
-        .with(csvSchema)
-        .createParser(csvFile)) {
-      long rowCount = 0;
-      while (parser.nextToken() != null) {
-        if (parser.currentToken() == JsonToken.START_OBJECT) {
-          rowCount++;
-        }
-      }
-      return rowCount;
+
+    @Override
+    public boolean add(DataRow row) {
+        throw new UnsupportedOperationException("Adding rows is not 
supported.");
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException("Clearing rows is not 
supported.");
+    }
+
+    @Override
+    public long size() {
+        return size;
     }
-  }
-
-  @Override
-  public DataSchema schema() {
-    return dataSchema;
-  }
-
-  @Override
-  public boolean add(DataRow row) {
-    throw new UnsupportedOperationException("Adding rows is not supported.");
-  }
-
-  @Override
-  public void clear() {
-    throw new UnsupportedOperationException("Clearing rows is not supported.");
-  }
-
-  @Override
-  public long size() {
-    return size;
-  }
-
-  @Override
-  public Iterator<DataRow> iterator() {
-    try {
-      CsvMapper csvMapper = new CsvMapper();
-      parser = csvMapper.readerFor(Map.class)
-          .with(csvSchema)
-          .createParser(csvFile);
-
-      Iterator<Map<String, String>> csvIterator = 
csvMapper.readerFor(Map.class)
-          .with(csvSchema)
-          .readValues(parser);
-
-      return new Iterator<>() {
-        @Override
-        public boolean hasNext() {
-          return csvIterator.hasNext();
+
+    @Override
+    public Iterator<DataRow> iterator() {
+        try {
+
+            csvIterator =  new CsvMapper()
+                    .readerFor(Map.class)
+                    .with(csvSchema)
+                    .readValues(file);
+
+            return new Iterator<>() {
+                @Override
+                public boolean hasNext() {
+                    return csvIterator.hasNext();
+                }
+
+                @Override
+                public DataRow next() {
+                    Map<String, String> csvRow = csvIterator.next();
+                    DataRow dataRow = dataSchema.createRow();
+
+                    for (int i = 0; i < dataSchema.columns().size(); i++) {
+                        DataColumn column = dataSchema.columns().get(i);
+                        String columnName = column.name();
+                        String value = csvRow.get(columnName);
+
+                        if (value != null) {
+                            Object parsedValue = parseValue(column, value);
+                            dataRow.set(i, parsedValue);
+                        } else {
+                            dataRow.set(i, null);
+                        }
+                    }
+                    return dataRow;
+                }
+            };
+
+        } catch (IOException e) {
+            throw new DataStoreException("Error reading CSV file", e);
         }
+    }
 
-        @Override
-        public DataRow next() {
-          Map<String, String> csvRow = csvIterator.next();
-          DataRow dataRow = dataSchema.createRow();
-
-          for (int i = 0; i < dataSchema.columns().size(); i++) {
-            DataColumn column = dataSchema.columns().get(i);
-            String columnName = column.name();
-            String value = csvRow.get(columnName);
-
-            if (value != null) {
-              Object parsedValue = parseValue(column, value);
-              dataRow.set(i, parsedValue);
-            } else {
-              dataRow.set(i, null);
+    /**
+     * Parses the string value from the CSV according to the column type.
+     *
+     * @param column the data column
+     * @param value  the string value from the CSV
+     * @return the parsed value
+     */
+    private Object parseValue(DataColumn column, String value) {
+        ColumnType type = column.type();
+        try {
+            if (value == null || value.isEmpty()) {
+                return null;
             }
-          }
-          return dataRow;
+            return switch (type) {
+                case STRING -> value;
+                case INTEGER -> Integer.parseInt(value);
+                case LONG -> Long.parseLong(value);
+                case FLOAT -> Float.parseFloat(value);
+                case DOUBLE -> Double.parseDouble(value);
+                case BOOLEAN -> Boolean.parseBoolean(value);
+                case GEOMETRY, POINT, LINESTRING, POLYGON, MULTIPOINT, 
MULTILINESTRING, MULTIPOLYGON,
+                     GEOMETRYCOLLECTION -> {
+                    WKTReader reader = new WKTReader();
+                    yield reader.read(value);
+                }
+                default -> throw new IllegalArgumentException("Unsupported 
column type: " + type);
+            };
+        } catch (Exception e) {
+            throw new DataStoreException("Error parsing value for column " + 
column.name(), e);
         }
-      };
+    }
 
-    } catch (IOException e) {
-      throw new DataStoreException("Error reading CSV file", e);
+    @Override
+    public Spliterator<DataRow> spliterator() {
+        return Spliterators.spliteratorUnknownSize(iterator(), 
Spliterator.ORDERED);
     }
-  }
-
-  /**
-   * Parses the string value from the CSV according to the column type.
-   *
-   * @param column the data column
-   * @param value the string value from the CSV
-   * @return the parsed value
-   */
-  private Object parseValue(DataColumn column, String value) {
-    ColumnType type = column.type();
-    try {
-      if (value == null || value.isEmpty()) {
-        return null;
-      }
-        return switch (type) {
-            case STRING -> value;
-            case INTEGER -> Integer.parseInt(value);
-            case LONG -> Long.parseLong(value);
-            case FLOAT -> Float.parseFloat(value);
-            case DOUBLE -> Double.parseDouble(value);
-            case BOOLEAN -> Boolean.parseBoolean(value);
-            case GEOMETRY, POINT, LINESTRING, POLYGON, MULTIPOINT, 
MULTILINESTRING, MULTIPOLYGON,
-                 GEOMETRYCOLLECTION -> {
-                WKTReader reader = new WKTReader();
-                yield reader.read(value);
-            }
-            default -> throw new IllegalArgumentException("Unsupported column 
type: " + type);
-        };
-    } catch (Exception e) {
-      throw new DataStoreException("Error parsing value for column " + 
column.name(), e);
+
+    @Override
+    public Stream<DataRow> stream() {
+        return StreamSupport.stream(spliterator(), false);
     }
-  }
-
-  @Override
-  public Spliterator<DataRow> spliterator() {
-    return Spliterators.spliteratorUnknownSize(iterator(), 
Spliterator.ORDERED);
-  }
-
-  @Override
-  public Stream<DataRow> stream() {
-    return StreamSupport.stream(spliterator(), false);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (parser != null) {
-      parser.close();
+
+    @Override
+    public void close() throws IOException {
+        if (csvIterator != null) {
+            csvIterator.close();
+        }
     }
-  }
 }
diff --git 
a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java
 
b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java
index 489634505..71776b724 100644
--- 
a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java
+++ 
b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java
@@ -29,7 +29,7 @@ class CsvDataTableGeonamesTest {
 
   @Test
   void testGeonamesCsvDataTable() throws IOException {
-    DataTable dataTable = new CsvDataTable(GEONAMES_CSV.toFile(), false);
+    DataTable dataTable = new CsvDataTable("sample", GEONAMES_CSV.toFile(), 
false, '\t');
 
     assertEquals(5, dataTable.size(), "DataTable should have 5 rows.");
 
diff --git 
a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java 
b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java
index 444b37996..bb67934df 100644
--- a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java
+++ b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java
@@ -58,7 +58,7 @@ class CsvDataTableTest {
         2,PointB,"POINT(2 2)"
         """;
     Files.writeString(tempCsvFile.toPath(), csvContent);
-    DataTable dataTable = new CsvDataTable(tempCsvFile, true);
+    DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, 
',');
     assertEquals(2, dataTable.size());
     int rowCount = 0;
     for (DataRow row : dataTable) {
@@ -86,8 +86,7 @@ class CsvDataTableTest {
         new DataColumnFixed("column1", Cardinality.REQUIRED, 
ColumnType.INTEGER),
         new DataColumnFixed("column2", Cardinality.OPTIONAL, 
ColumnType.STRING),
         new DataColumnFixed("column3", Cardinality.OPTIONAL, 
ColumnType.GEOMETRY));
-    DataSchema schema = new DataSchemaImpl("test_table", columns);
-    DataTable dataTable = new CsvDataTable(tempCsvFile, false);
+    DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, false, 
';');
     assertEquals(2, dataTable.size());
     int rowCount = 0;
     for (DataRow row : dataTable) {
@@ -120,8 +119,7 @@ class CsvDataTableTest {
         new DataColumnFixed("double_col", Cardinality.REQUIRED, 
ColumnType.DOUBLE),
         new DataColumnFixed("bool_col", Cardinality.REQUIRED, 
ColumnType.BOOLEAN),
         new DataColumnFixed("string_col", Cardinality.REQUIRED, 
ColumnType.STRING));
-    DataSchema schema = new DataSchemaImpl("test_table", columns);
-    DataTable dataTable = new CsvDataTable(tempCsvFile, true);
+    DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, 
',');
     assertEquals(2, dataTable.size());
     int rowCount = 0;
     for (DataRow row : dataTable) {
@@ -150,8 +148,7 @@ class CsvDataTableTest {
     List<DataColumn> columns = List.of(
         new DataColumnFixed("id", Cardinality.REQUIRED, ColumnType.INTEGER),
         new DataColumnFixed("name", Cardinality.OPTIONAL, ColumnType.STRING));
-    DataSchema schema = new DataSchemaImpl("test_table", columns);
-    DataTable dataTable = new CsvDataTable(tempCsvFile, true);
+    DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, 
',');
     assertThrows(RuntimeException.class, () -> {
       for (DataRow row : dataTable) {
         // This line should throw an exception because abc is not a valid 
integer
@@ -164,7 +161,7 @@ class CsvDataTableTest {
   void testAddAndClearUnsupportedOperations() throws IOException {
     String csvContent = "";
     Files.writeString(tempCsvFile.toPath(), csvContent);
-    DataTable dataTable = new CsvDataTable(tempCsvFile, true);
+    DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, 
',');
     assertThrows(UnsupportedOperationException.class, () -> 
dataTable.add(null));
     assertThrows(UnsupportedOperationException.class, dataTable::clear);
   }
@@ -178,7 +175,7 @@ class CsvDataTableTest {
         3,Name3
         """;
     Files.writeString(tempCsvFile.toPath(), csvContent);
-    DataTable dataTable = new CsvDataTable(tempCsvFile, true);
+    DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, 
',');
     assertEquals(3, dataTable.size());
   }
 }
diff --git 
a/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java
 
b/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java
index 1a9830b1b..59a13f6e4 100644
--- 
a/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java
+++ 
b/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java
@@ -65,8 +65,8 @@ public class AppendOnlyLog<E> implements DataCollection<E> {
     this.dataType = dataType;
     this.memory = memory;
     this.segmentSize = memory.segmentSize();
-    this.offset = Long.BYTES;
-    this.size = memory.segment(0).getLong(0);
+    this.size = memory.header().getLong(0);
+    this.offset = 0;
   }
 
   /**
@@ -148,6 +148,7 @@ public class AppendOnlyLog<E> implements DataCollection<E> {
 
   @Override
   public void close() throws IOException {
+    memory.header().putLong(0, size);
     memory.close();
   }
 
@@ -166,7 +167,7 @@ public class AppendOnlyLog<E> implements DataCollection<E> {
     private AppendOnlyLogIterator(long size) {
       this.size = size;
       index = 0;
-      position = Long.BYTES;
+      position = 0;
     }
 
     @Override
diff --git 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java
index d0a2580bf..a8f159cc6 100644
--- a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java
+++ b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java
@@ -28,28 +28,42 @@ import java.util.List;
 /** A base class to manage segments of on-heap, off-heap, or on-disk memory. */
 public abstract class Memory<T extends ByteBuffer> implements Closeable {
 
+  private final int headerSize;
+
   private final int segmentSize;
 
   private final long segmentShift;
 
   private final long segmentMask;
 
-  protected final List<T> segments = new ArrayList<>();
+  protected T header;
+
+  protected List<T> segments = new ArrayList<>();
 
   /**
    * Constructs a memory with a given segment size.
    *
    * @param segmentSize the size of the segments
    */
-  protected Memory(int segmentSize) {
+  protected Memory(int headerSize, int segmentSize) {
     if ((segmentSize & -segmentSize) != segmentSize) {
       throw new IllegalArgumentException("The segment size must be a power of 
2");
     }
+    this.headerSize = headerSize;
     this.segmentSize = segmentSize;
     this.segmentShift = (long) (Math.log(this.segmentSize) / Math.log(2));
     this.segmentMask = this.segmentSize - 1l;
   }
 
+  /**
+   * Returns the size of the header.
+   *
+   * @return the size of the header
+   */
+  public int headerSize() {
+    return headerSize;
+  }
+
   /**
    * Returns the size of the segments.
    *
@@ -77,6 +91,22 @@ public abstract class Memory<T extends ByteBuffer> 
implements Closeable {
     return segmentMask;
   }
 
+  public ByteBuffer header() {
+    if (header == null) {
+      synchronized (this) {
+        header = allocateHeader();
+      }
+    }
+    return header;
+  }
+
+  /**
+   * Allocates a header.
+   *
+   * @return the header
+   */
+  protected abstract T allocateHeader();
+
   /**
    * Returns a segment of the memory.
    *
@@ -85,47 +115,44 @@ public abstract class Memory<T extends ByteBuffer> 
implements Closeable {
    */
   public ByteBuffer segment(int index) {
     if (segments.size() <= index) {
-      return allocate(index);
+      return allocateSegmentInternal(index);
     }
     ByteBuffer segment = segments.get(index);
     if (segment == null) {
-      return allocate(index);
+      return allocateSegmentInternal(index);
     }
     return segment;
   }
 
+  /** Returns the size of the allocated memory. */
+  public long size() {
+    return (long) segments.size() * (long) segmentSize;
+  }
+
   /** The allocation of segments is synchronized to enable access by multiple 
threads. */
-  private synchronized ByteBuffer allocate(int index) {
+  private synchronized ByteBuffer allocateSegmentInternal(int index) {
     while (segments.size() <= index) {
       segments.add(null);
     }
     T segment = segments.get(index);
     if (segment == null) {
-      segment = allocate(index, segmentSize);
+      segment = allocateSegment(index);
       segments.set(index, segment);
     }
     return segment;
   }
 
-  /** Returns the size of the allocated memory. */
-  public long size() {
-    return (long) segments.size() * (long) segmentSize;
-  }
-
   /**
    * Allocates a segment for a given index and size.
    *
    * @param index the index of the segment
-   * @param size the size of the segment
    * @return the segment
    */
-  protected abstract T allocate(int index, int size);
+  protected abstract T allocateSegment(int index);
 
   /**
    * Clears the memory and the underlying resources.
    */
   public abstract void clear() throws IOException;
 
-
-
 }
diff --git 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java
 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java
index 82d2e0af7..1b9a18dfa 100644
--- 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java
+++ 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java
@@ -53,18 +53,31 @@ public class MemoryMappedDirectory extends 
Memory<MappedByteBuffer> {
    * @param segmentBytes the size of the segments in bytes
    */
   public MemoryMappedDirectory(Path directory, int segmentBytes) {
-    super(segmentBytes);
+    super(1024, segmentBytes);
     this.directory = directory;
   }
 
+  @Override
+  protected MappedByteBuffer allocateHeader() {
+    try {
+      Path file = directory.resolve("header");
+      try (FileChannel channel = FileChannel.open(file, 
StandardOpenOption.CREATE,
+              StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+        return channel.map(MapMode.READ_WRITE, 0, 1024);
+      }
+    } catch (IOException e) {
+      throw new MemoryException(e);
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
-  protected MappedByteBuffer allocate(int index, int size) {
+  protected MappedByteBuffer allocateSegment(int index) {
     try {
       Path file = directory.resolve(String.format("%s.part", index));
       try (FileChannel channel = FileChannel.open(file, 
StandardOpenOption.CREATE,
           StandardOpenOption.READ, StandardOpenOption.WRITE)) {
-        return channel.map(MapMode.READ_WRITE, 0, size);
+        return channel.map(MapMode.READ_WRITE, 0, segmentSize());
       }
     } catch (IOException e) {
       throw new MemoryException(e);
@@ -74,6 +87,7 @@ public class MemoryMappedDirectory extends 
Memory<MappedByteBuffer> {
   /** {@inheritDoc} */
   @Override
   public void close() throws IOException {
+    MappedByteBufferUtils.unmap(header);
     for (MappedByteBuffer buffer : segments) {
       MappedByteBufferUtils.unmap(buffer);
     }
@@ -83,6 +97,7 @@ public class MemoryMappedDirectory extends 
Memory<MappedByteBuffer> {
   @Override
   public void clear() throws IOException {
     close();
+    header.clear();
     segments.clear();
     FileUtils.deleteRecursively(directory);
   }
diff --git 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java
 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java
index 5db613d34..7c2de6b8b 100644
--- 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java
+++ 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java
@@ -49,17 +49,32 @@ public class MemoryMappedFile extends 
Memory<MappedByteBuffer> {
    * @param segmentBytes the size of the segments in bytes
    */
   public MemoryMappedFile(Path file, int segmentBytes) {
-    super(segmentBytes);
+    super(1024, segmentBytes);
     this.file = file;
   }
 
+  @Override
+  protected MappedByteBuffer allocateHeader() {
+    try {
+      try (FileChannel channel = FileChannel.open(file, 
StandardOpenOption.CREATE,
+              StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+        return channel.map(MapMode.READ_WRITE, 0, headerSize());
+      }
+    } catch (IOException e) {
+      throw new MemoryException(e);
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
-  protected MappedByteBuffer allocate(int index, int size) {
+  protected MappedByteBuffer allocateSegment(int index) {
     try {
       try (FileChannel channel = FileChannel.open(file, 
StandardOpenOption.CREATE,
           StandardOpenOption.READ, StandardOpenOption.WRITE)) {
-        return channel.map(MapMode.READ_WRITE, (long) index * size, size);
+        int headerSize = headerSize();
+        int segmentSize = segmentSize();
+        int segmentPosition = headerSize + index * segmentSize;
+        return channel.map(MapMode.READ_WRITE, segmentPosition, segmentSize);
       }
     } catch (IOException e) {
       throw new MemoryException(e);
diff --git 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java
 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java
index f95f03b51..8294b68be 100644
--- 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java
+++ 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java
@@ -36,13 +36,19 @@ public class OffHeapMemory extends Memory<ByteBuffer> {
    * @param segmentSize the size of the segments in bytes
    */
   public OffHeapMemory(int segmentSize) {
-    super(segmentSize);
+    super(1024, segmentSize);
   }
 
   /** {@inheritDoc} */
   @Override
-  protected ByteBuffer allocate(int index, int size) {
-    return ByteBuffer.allocateDirect(size);
+  protected ByteBuffer allocateHeader() {
+    return ByteBuffer.allocateDirect(headerSize());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected ByteBuffer allocateSegment(int index) {
+    return ByteBuffer.allocateDirect(segmentSize());
   }
 
   /** {@inheritDoc} */
diff --git 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java
index 4b0fff757..5d18bbe40 100644
--- 
a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java
+++ 
b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java
@@ -36,13 +36,19 @@ public class OnHeapMemory extends Memory<ByteBuffer> {
    * @param segmentSize the size of the segments in bytes
    */
   public OnHeapMemory(int segmentSize) {
-    super(segmentSize);
+    super(1024, segmentSize);
   }
 
   /** {@inheritDoc} */
   @Override
-  protected ByteBuffer allocate(int index, int size) {
-    return ByteBuffer.allocate(size);
+  protected ByteBuffer allocateHeader() {
+    return ByteBuffer.allocate(headerSize());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected ByteBuffer allocateSegment(int index) {
+    return ByteBuffer.allocate(segmentSize());
   }
 
   /** {@inheritDoc} */
diff --git 
a/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java 
b/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java
index aef70f1e7..a2dcfecd8 100644
--- a/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java
+++ b/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java
@@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,7 +38,7 @@ import org.apache.baremaps.store.DataColumn.ColumnType;
 /**
  * A {@link DataSchema} is a description of the structure of a row in a {@link 
DataTable}.
  */
-public interface DataSchema extends Serializable {
+public interface DataSchema {
 
   /**
    * Returns the name of the schema.
@@ -100,14 +102,19 @@ public interface DataSchema extends Serializable {
     return mapper;
   }
 
-  static DataSchema read(Path path) throws IOException {
+  static DataSchema read(ByteBuffer buffer) throws IOException {
     var mapper = configureObjectMapper();
-    return mapper.readValue(path.toFile(), DataSchema.class);
+    int length = buffer.getInt();
+    byte[] bytes = new byte[length];
+    buffer.get(bytes);
+    return mapper.readValue(bytes, DataSchema.class);
   }
 
-  static void write(Path path, DataSchema schema) throws IOException {
+  static void write(ByteBuffer buffer, DataSchema schema) throws IOException {
     var mapper = configureObjectMapper();
-    mapper.writerWithDefaultPrettyPrinter().writeValue(path.toFile(), schema);
+    var bytes = 
mapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(schema);
+    buffer.putInt(bytes.length);
+    buffer.put(bytes);
   }
-  
+
 }


Reply via email to