This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch calcite-framework in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit 251ea904ca7f4a272d05938d7db49f54da2edd65 Author: Bertil Chapuis <[email protected]> AuthorDate: Sun Jan 26 11:43:54 2025 +0100 Add header to memory --- .../baremaps/calcite/BaremapsTableFactory.java | 91 ++++-- .../baremaps/calcite/BaremapsTableFactoryTest.java | 118 +++++++ .../apache/baremaps/calcite/CalciteTest.java | 51 +-- .../java/org/apache/baremaps/csv/CsvDataStore.java | 6 +- .../java/org/apache/baremaps/csv/CsvDataTable.java | 343 ++++++++++----------- .../baremaps/csv/CsvDataTableGeonamesTest.java | 2 +- .../org/apache/baremaps/csv/CsvDataTableTest.java | 15 +- .../baremaps/data/collection/AppendOnlyLog.java | 7 +- .../org/apache/baremaps/data/memory/Memory.java | 57 +++- .../data/memory/MemoryMappedDirectory.java | 21 +- .../baremaps/data/memory/MemoryMappedFile.java | 21 +- .../apache/baremaps/data/memory/OffHeapMemory.java | 12 +- .../apache/baremaps/data/memory/OnHeapMemory.java | 12 +- .../java/org/apache/baremaps/store/DataSchema.java | 19 +- 14 files changed, 476 insertions(+), 299 deletions(-) diff --git a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java index bd0c99b31..e53de427b 100644 --- a/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java +++ b/baremaps-calcite/src/main/java/org/apache/baremaps/calcite/BaremapsTableFactory.java @@ -17,37 +17,82 @@ package org.apache.baremaps.calcite; -import java.io.File; -import java.io.IOException; -import java.util.Map; import org.apache.baremaps.csv.CsvDataTable; +import org.apache.baremaps.data.collection.AppendOnlyLog; +import org.apache.baremaps.data.collection.DataCollection; +import org.apache.baremaps.data.collection.IndexedDataList; +import org.apache.baremaps.data.memory.Memory; +import org.apache.baremaps.data.memory.MemoryMappedDirectory; +import org.apache.baremaps.data.store.DataTableImpl; +import org.apache.baremaps.data.type.DataType; +import org.apache.baremaps.data.type.DataTypeImpl; +import org.apache.baremaps.store.DataRow; +import org.apache.baremaps.store.DataSchema; +import org.apache.baremaps.store.DataTable; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeImpl; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.TableFactory; +import java.io.File; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; + public class BaremapsTableFactory implements TableFactory<BaremapsTable> { - public BaremapsTableFactory() { - - } - - @Override - public BaremapsTable create(SchemaPlus schema, String name, - Map<String, Object> operand, RelDataType rowType) { - final RelProtoDataType protoRowType = - rowType != null ? RelDataTypeImpl.proto(rowType) : null; - String file = (String) operand.get("file"); - if (file != null && file.endsWith(".csv")) { - try { - return new BaremapsTable( - new CsvDataTable(new File(file), true), - protoRowType); - } catch (IOException e) { - throw new RuntimeException(e); - } + public BaremapsTableFactory() { + } - return new BaremapsTable(null, protoRowType); - } + + @Override + public BaremapsTable create( + SchemaPlus schema, + String name, + Map<String, Object> operand, + RelDataType rowType) { + final RelProtoDataType protoRowType = + rowType != null ? RelDataTypeImpl.proto(rowType) : null; + String format = (String) operand.get("format"); + DataTable dataTable = switch (format) { + case "mmap" -> createMMapTable(schema, name, operand, rowType); + case "csv" -> createCsvTable(schema, name, operand, rowType); + default -> throw new RuntimeException("Unsupported format"); + }; + return new BaremapsTable(dataTable, protoRowType); + } + + private DataTable createMMapTable(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { + String directory = (String) operand.get("directory"); + if (directory == null) { + throw new RuntimeException("A directory should be specified"); + } + try { + Path directoryPath = Paths.get(directory); + Path schemaFile = directoryPath.resolve("schema.json"); + DataSchema dataSchema = DataSchema.read(schemaFile); + DataType<DataRow> dataType = new DataTypeImpl(dataSchema); + Memory<MappedByteBuffer> memory = new MemoryMappedDirectory(directoryPath); + DataCollection<DataRow> dataCollection = new AppendOnlyLog<>(dataType, memory); + return new DataTableImpl(dataSchema, dataCollection); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private DataTable createCsvTable(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { + String file = (String) operand.get("file"); + if (file == null) { + throw new RuntimeException("A file should be specified"); + } + try { + return new CsvDataTable(name, new File(file), true, ','); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } diff --git a/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsTableFactoryTest.java b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsTableFactoryTest.java new file mode 100644 index 000000000..1598163e7 --- /dev/null +++ b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/BaremapsTableFactoryTest.java @@ -0,0 +1,118 @@ +package org.apache.baremaps.calcite; + +import org.apache.baremaps.data.collection.AppendOnlyLog; +import org.apache.baremaps.data.memory.MemoryMappedDirectory; +import org.apache.baremaps.data.store.DataTableImpl; +import org.apache.baremaps.data.type.DataTypeImpl; +import org.apache.baremaps.data.util.FileUtils; +import org.apache.baremaps.store.*; +import org.apache.baremaps.store.DataColumn.Cardinality; +import org.apache.baremaps.store.DataColumn.ColumnType; +import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; + +import java.io.File; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.List; + +class BaremapsTableFactoryTest { + + + @Test + public void createCsvTable() throws Exception { + File file = File.createTempFile("test", ".csv"); + String csv = """ + ID,NAME,GEOM + 1,Paris,POINT(2.3522 48.8566) + 2,New York,POINT(-74.0060 40.7128) + """; + try (PrintWriter writer = new PrintWriter(file)) { + writer.write(csv); + } + String model = """ + { + version: '1.0', + defaultSchema: 'TEST', + schemas: [ + { + name: 'TEST', + tables: [ + { + name: 'TEST', + factory: 'org.apache.baremaps.calcite.BaremapsTableFactory', + operand: { + format: 'csv', + file: '%s' + } + } + ] + } + ] + } + """.formatted(file.getAbsolutePath()); + try (Connection connection = + DriverManager.getConnection("jdbc:calcite:model=inline:" + model)) { + + ResultSet resultSet = connection.createStatement().executeQuery("SELECT * FROM TEST.TEST"); + while (resultSet.next()) { + System.out.println(resultSet.getString("ID") + " " + resultSet.getString("GEOM")); + } + } finally { + file.delete(); + } + } + + @Test + public void createMMapTable() throws Exception { + Path path = Files.createTempDirectory("temp"); + + DataSchema dataSchema = new DataSchemaImpl("test", List.of( + new DataColumnFixed("id", Cardinality.REQUIRED, ColumnType.INTEGER), + new DataColumnFixed("name", Cardinality.REQUIRED, ColumnType.STRING), + new DataColumnFixed("geom", Cardinality.REQUIRED, ColumnType.GEOMETRY) + )); + + DataTable dataTable = new DataTableImpl(dataSchema, new AppendOnlyLog<>(new DataTypeImpl(dataSchema), new MemoryMappedDirectory(path))); + dataTable.add(new DataRowImpl(dataSchema, List.of(1, "a", new GeometryFactory().createPoint(new Coordinate(1, 1))))); + dataTable.add(new DataRowImpl(dataSchema, List.of(2, "b", new GeometryFactory().createPoint(new Coordinate(2, 2))))); + dataTable.close(); + + String model = """ + { + version: '1.0', + defaultSchema: 'TEST', + schemas: [ + { + name: 'TEST', + tables: [ + { + name: 'TEST', + factory: 'org.apache.baremaps.calcite.BaremapsTableFactory', + operand: { + format: 'mmap', + directory: '%s' + } + } + ] + } + ] + } + """.formatted(path.toAbsolutePath()); + try (Connection connection = + DriverManager.getConnection("jdbc:calcite:model=inline:" + model)) { + + ResultSet resultSet = connection.createStatement().executeQuery("SELECT * FROM TEST.TEST"); + while (resultSet.next()) { + System.out.println(resultSet.getString("ID") + " " + resultSet.getString("GEOM")); + } + } finally { + FileUtils.deleteRecursively(path); + } + } +} \ No newline at end of file diff --git a/baremaps-calcite/src/test/java/com/apache/baremaps/calcite/CalciteTest.java b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java similarity index 86% rename from baremaps-calcite/src/test/java/com/apache/baremaps/calcite/CalciteTest.java rename to baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java index 870263c8f..b68d2261c 100644 --- a/baremaps-calcite/src/test/java/com/apache/baremaps/calcite/CalciteTest.java +++ b/baremaps-calcite/src/test/java/org/apache/baremaps/calcite/CalciteTest.java @@ -15,13 +15,11 @@ * limitations under the License. */ -package com.apache.baremaps.calcite; +package org.apache.baremaps.calcite; -import java.io.File; -import java.io.PrintWriter; import java.sql.*; import java.util.*; -import org.apache.baremaps.calcite.BaremapsTable; + import org.apache.baremaps.data.collection.AppendOnlyLog; import org.apache.baremaps.data.collection.IndexedDataList; import org.apache.baremaps.data.store.DataTableImpl; @@ -247,50 +245,5 @@ public class CalciteTest { System.out.println("List B (after SQL): " + listB); } - @Test - public void testCsvStream() throws Exception { - File file = File.createTempFile("test", ".csv"); - String csv = """ - ID,NAME,GEOM - 1,Paris,POINT(2.3522 48.8566) - 2,New York,POINT(-74.0060 40.7128) - """; - try (PrintWriter writer = new PrintWriter(file)) { - writer.write(csv); - } - - String model = """ - { - version: '1.0', - defaultSchema: 'TEST', - schemas: [ - { - name: 'TEST', - tables: [ - { - name: 'TEST', - type: 'custom', - factory: 'org.apache.baremaps.calcite.BaremapsTableFactory', - operand: { - file: '%s' - } - } - ] - } - ] - } - """.formatted(file.getAbsolutePath()); - - try (Connection connection = - DriverManager.getConnection("jdbc:calcite:model=inline:" + model)) { - - ResultSet resultSet = connection.createStatement().executeQuery("SELECT * FROM TEST.TEST"); - while (resultSet.next()) { - System.out.println(resultSet.getString("ID") + " " + resultSet.getString("GEOM")); - } - } finally { - file.delete(); - } - } } diff --git a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java index 74abb32e3..9bfc41aa2 100644 --- a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java +++ b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataStore.java @@ -41,15 +41,15 @@ public class CsvDataStore implements DataStore { * @param tableName the name of the table * @param schema the data schema defining the structure * @param csvFile the CSV file to read data from - * @param hasHeader whether the CSV file includes a header row + * @param header whether the CSV file includes a header row * @param separator the character used to separate columns in the CSV file * @throws IOException if an I/O error occurs */ - public CsvDataStore(String tableName, DataSchema schema, File csvFile, boolean hasHeader, + public CsvDataStore(String tableName, DataSchema schema, File csvFile, boolean header, char separator) throws IOException { this.tableName = tableName; this.schema = schema; - this.dataTable = new CsvDataTable(csvFile, hasHeader); + this.dataTable = new CsvDataTable(tableName, csvFile, header, separator); } /** diff --git a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java index 360debfae..03283ec8a 100644 --- a/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java +++ b/baremaps-csv/src/main/java/org/apache/baremaps/csv/CsvDataTable.java @@ -17,208 +17,195 @@ package org.apache.baremaps.csv; +import com.fasterxml.jackson.core.FormatSchema; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.baremaps.store.*; +import org.apache.baremaps.store.DataColumn.Cardinality; +import org.apache.baremaps.store.DataColumn.ColumnType; +import org.locationtech.jts.io.WKTReader; + import java.io.File; import java.io.IOException; import java.util.*; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.baremaps.store.*; -import org.apache.baremaps.store.DataColumn.Cardinality; -import org.apache.baremaps.store.DataColumn.ColumnType; -import org.locationtech.jts.io.WKTReader; /** * A DataTable implementation that reads data from a CSV file using Jackson. */ public class CsvDataTable implements DataTable { - private final File csvFile; - private final CsvSchema csvSchema; - private final DataSchema dataSchema; - - private final long size; - private JsonParser parser; - - /** - * Constructs a CsvDataTable with the specified schema, CSV file, header presence, and separator. - * - * @param csvFile the CSV file to read data from - * @param hasHeader whether the CSV file includes a header row - * @throws IOException if an I/O error occurs - */ - public CsvDataTable(File csvFile, boolean hasHeader) throws IOException { - this.csvFile = csvFile; - this.csvSchema = inferCsvSchema(csvFile); - this.dataSchema = createDataSchema(csvFile.getName(), csvSchema); - this.size = calculateSize(); - } - - - private CsvSchema inferCsvSchema(File csvFile) { - CsvSchema csvSchema = CsvSchema.emptySchema().withUseHeader(true).withColumnSeparator(','); - try (var ignored = new CsvMapper().readerFor(Map.class) - .with(csvSchema) - .createParser(csvFile)) { - var test = ignored.readValueAsTree(); - return csvSchema; - } catch (IOException e) { - throw new DataStoreException("Error reading CSV file", e); + private final File file; + private final CsvSchema csvSchema; + private final DataSchema dataSchema; + + private final long size; + private MappingIterator<Map<String, String>> csvIterator; + + /** + * Constructs a CsvDataTable with the specified schema, CSV file, header presence, and separator. + * + * @param name the name of the table + * @param csvFile the CSV file to read data from + * @param header whether the CSV file includes a header row + * @param separator the separator used in the CSV file + * @throws IOException if an I/O error occurs + */ + public CsvDataTable(String name, File csvFile, boolean header, char separator) throws IOException { + this.file = csvFile; + + // Iterate over all records to infer the csv schema and calculate the size of the table + CsvSchema csvSchema = CsvSchema.emptySchema() + .withUseHeader(header) + .withColumnSeparator(separator); + + try (MappingIterator<Object> iterator = new CsvMapper() + .readerFor(Object.class) + .with(csvSchema) + .readValues(csvFile)) { + + int count = 0; + while (iterator.hasNext()) { + iterator.next(); + count++; + } + + this.csvSchema = (CsvSchema) iterator.getParserSchema(); + this.size = count; + } catch (IOException e) { + throw new DataStoreException("Error reading CSV file", e); + } + + // Map the csv schema to a data schema + List<DataColumn> columns = new ArrayList<>(); + for (String columnName : this.csvSchema.getColumnNames()) { + switch (this.csvSchema.column(columnName).getType()) { + case STRING -> columns + .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, ColumnType.STRING)); + case NUMBER -> columns + .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, ColumnType.DOUBLE)); + case BOOLEAN -> columns + .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, ColumnType.BOOLEAN)); + case ARRAY -> columns + .add(new DataColumnFixed(columnName, Cardinality.REPEATED, ColumnType.STRING)); + default -> throw new IllegalArgumentException( + "Unsupported column type: " + csvSchema.column(columnName).getType()); + } + } + this.dataSchema = new DataSchemaImpl(name, columns); } - } - - private DataSchema createDataSchema(String name, CsvSchema csvSchema) { - List<DataColumn> columns = new ArrayList<>(); - for (String columnName : csvSchema.getColumnNames()) { - switch (csvSchema.column(columnName).getType()) { - case STRING -> columns - .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, ColumnType.STRING)); - case NUMBER -> columns - .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, ColumnType.DOUBLE)); - case BOOLEAN -> columns - .add(new DataColumnFixed(columnName, Cardinality.REQUIRED, ColumnType.BOOLEAN)); - case ARRAY -> columns - .add(new DataColumnFixed(columnName, Cardinality.REPEATED, ColumnType.STRING)); - default -> throw new IllegalArgumentException( - "Unsupported column type: " + csvSchema.column(columnName).getType()); - } + + @Override + public DataSchema schema() { + return dataSchema; } - return new DataSchemaImpl(name, columns); - } - - /** - * Calculates the number of rows in the CSV file. - * - * @return the number of rows - * @throws IOException if an I/O error occurs - */ - private long calculateSize() throws IOException { - try (var parser = new CsvMapper().readerFor(Map.class) - .with(csvSchema) - .createParser(csvFile)) { - long rowCount = 0; - while (parser.nextToken() != null) { - if (parser.currentToken() == JsonToken.START_OBJECT) { - rowCount++; - } - } - return rowCount; + + @Override + public boolean add(DataRow row) { + throw new UnsupportedOperationException("Adding rows is not supported."); + } + + @Override + public void clear() { + throw new UnsupportedOperationException("Clearing rows is not supported."); + } + + @Override + public long size() { + return size; } - } - - @Override - public DataSchema schema() { - return dataSchema; - } - - @Override - public boolean add(DataRow row) { - throw new UnsupportedOperationException("Adding rows is not supported."); - } - - @Override - public void clear() { - throw new UnsupportedOperationException("Clearing rows is not supported."); - } - - @Override - public long size() { - return size; - } - - @Override - public Iterator<DataRow> iterator() { - try { - CsvMapper csvMapper = new CsvMapper(); - parser = csvMapper.readerFor(Map.class) - .with(csvSchema) - .createParser(csvFile); - - Iterator<Map<String, String>> csvIterator = csvMapper.readerFor(Map.class) - .with(csvSchema) - .readValues(parser); - - return new Iterator<>() { - @Override - public boolean hasNext() { - return csvIterator.hasNext(); + + @Override + public Iterator<DataRow> iterator() { + try { + + csvIterator = new CsvMapper() + .readerFor(Map.class) + .with(csvSchema) + .readValues(file); + + return new Iterator<>() { + @Override + public boolean hasNext() { + return csvIterator.hasNext(); + } + + @Override + public DataRow next() { + Map<String, String> csvRow = csvIterator.next(); + DataRow dataRow = dataSchema.createRow(); + + for (int i = 0; i < dataSchema.columns().size(); i++) { + DataColumn column = dataSchema.columns().get(i); + String columnName = column.name(); + String value = csvRow.get(columnName); + + if (value != null) { + Object parsedValue = parseValue(column, value); + dataRow.set(i, parsedValue); + } else { + dataRow.set(i, null); + } + } + return dataRow; + } + }; + + } catch (IOException e) { + throw new DataStoreException("Error reading CSV file", e); } + } - @Override - public DataRow next() { - Map<String, String> csvRow = csvIterator.next(); - DataRow dataRow = dataSchema.createRow(); - - for (int i = 0; i < dataSchema.columns().size(); i++) { - DataColumn column = dataSchema.columns().get(i); - String columnName = column.name(); - String value = csvRow.get(columnName); - - if (value != null) { - Object parsedValue = parseValue(column, value); - dataRow.set(i, parsedValue); - } else { - dataRow.set(i, null); + /** + * Parses the string value from the CSV according to the column type. + * + * @param column the data column + * @param value the string value from the CSV + * @return the parsed value + */ + private Object parseValue(DataColumn column, String value) { + ColumnType type = column.type(); + try { + if (value == null || value.isEmpty()) { + return null; } - } - return dataRow; + return switch (type) { + case STRING -> value; + case INTEGER -> Integer.parseInt(value); + case LONG -> Long.parseLong(value); + case FLOAT -> Float.parseFloat(value); + case DOUBLE -> Double.parseDouble(value); + case BOOLEAN -> Boolean.parseBoolean(value); + case GEOMETRY, POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, + GEOMETRYCOLLECTION -> { + WKTReader reader = new WKTReader(); + yield reader.read(value); + } + default -> throw new IllegalArgumentException("Unsupported column type: " + type); + }; + } catch (Exception e) { + throw new DataStoreException("Error parsing value for column " + column.name(), e); } - }; + } - } catch (IOException e) { - throw new DataStoreException("Error reading CSV file", e); + @Override + public Spliterator<DataRow> spliterator() { + return Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED); } - } - - /** - * Parses the string value from the CSV according to the column type. - * - * @param column the data column - * @param value the string value from the CSV - * @return the parsed value - */ - private Object parseValue(DataColumn column, String value) { - ColumnType type = column.type(); - try { - if (value == null || value.isEmpty()) { - return null; - } - return switch (type) { - case STRING -> value; - case INTEGER -> Integer.parseInt(value); - case LONG -> Long.parseLong(value); - case FLOAT -> Float.parseFloat(value); - case DOUBLE -> Double.parseDouble(value); - case BOOLEAN -> Boolean.parseBoolean(value); - case GEOMETRY, POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, - GEOMETRYCOLLECTION -> { - WKTReader reader = new WKTReader(); - yield reader.read(value); - } - default -> throw new IllegalArgumentException("Unsupported column type: " + type); - }; - } catch (Exception e) { - throw new DataStoreException("Error parsing value for column " + column.name(), e); + + @Override + public Stream<DataRow> stream() { + return StreamSupport.stream(spliterator(), false); } - } - - @Override - public Spliterator<DataRow> spliterator() { - return Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED); - } - - @Override - public Stream<DataRow> stream() { - return StreamSupport.stream(spliterator(), false); - } - - @Override - public void close() throws IOException { - if (parser != null) { - parser.close(); + + @Override + public void close() throws IOException { + if (csvIterator != null) { + csvIterator.close(); + } } - } } diff --git a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java index 489634505..71776b724 100644 --- a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java +++ b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableGeonamesTest.java @@ -29,7 +29,7 @@ class CsvDataTableGeonamesTest { @Test void testGeonamesCsvDataTable() throws IOException { - DataTable dataTable = new CsvDataTable(GEONAMES_CSV.toFile(), false); + DataTable dataTable = new CsvDataTable("sample", GEONAMES_CSV.toFile(), false, '\t'); assertEquals(5, dataTable.size(), "DataTable should have 5 rows."); diff --git a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java index 444b37996..bb67934df 100644 --- a/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java +++ b/baremaps-csv/src/test/java/org/apache/baremaps/csv/CsvDataTableTest.java @@ -58,7 +58,7 @@ class CsvDataTableTest { 2,PointB,"POINT(2 2)" """; Files.writeString(tempCsvFile.toPath(), csvContent); - DataTable dataTable = new CsvDataTable(tempCsvFile, true); + DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, ','); assertEquals(2, dataTable.size()); int rowCount = 0; for (DataRow row : dataTable) { @@ -86,8 +86,7 @@ class CsvDataTableTest { new DataColumnFixed("column1", Cardinality.REQUIRED, ColumnType.INTEGER), new DataColumnFixed("column2", Cardinality.OPTIONAL, ColumnType.STRING), new DataColumnFixed("column3", Cardinality.OPTIONAL, ColumnType.GEOMETRY)); - DataSchema schema = new DataSchemaImpl("test_table", columns); - DataTable dataTable = new CsvDataTable(tempCsvFile, false); + DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, false, ';'); assertEquals(2, dataTable.size()); int rowCount = 0; for (DataRow row : dataTable) { @@ -120,8 +119,7 @@ class CsvDataTableTest { new DataColumnFixed("double_col", Cardinality.REQUIRED, ColumnType.DOUBLE), new DataColumnFixed("bool_col", Cardinality.REQUIRED, ColumnType.BOOLEAN), new DataColumnFixed("string_col", Cardinality.REQUIRED, ColumnType.STRING)); - DataSchema schema = new DataSchemaImpl("test_table", columns); - DataTable dataTable = new CsvDataTable(tempCsvFile, true); + DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, ','); assertEquals(2, dataTable.size()); int rowCount = 0; for (DataRow row : dataTable) { @@ -150,8 +148,7 @@ class CsvDataTableTest { List<DataColumn> columns = List.of( new DataColumnFixed("id", Cardinality.REQUIRED, ColumnType.INTEGER), new DataColumnFixed("name", Cardinality.OPTIONAL, ColumnType.STRING)); - DataSchema schema = new DataSchemaImpl("test_table", columns); - DataTable dataTable = new CsvDataTable(tempCsvFile, true); + DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, ','); assertThrows(RuntimeException.class, () -> { for (DataRow row : dataTable) { // This line should throw an exception because abc is not a valid integer @@ -164,7 +161,7 @@ class CsvDataTableTest { void testAddAndClearUnsupportedOperations() throws IOException { String csvContent = ""; Files.writeString(tempCsvFile.toPath(), csvContent); - DataTable dataTable = new CsvDataTable(tempCsvFile, true); + DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, ','); assertThrows(UnsupportedOperationException.class, () -> dataTable.add(null)); assertThrows(UnsupportedOperationException.class, dataTable::clear); } @@ -178,7 +175,7 @@ class CsvDataTableTest { 3,Name3 """; Files.writeString(tempCsvFile.toPath(), csvContent); - DataTable dataTable = new CsvDataTable(tempCsvFile, true); + DataTable dataTable = new CsvDataTable("test_table", tempCsvFile, true, ','); assertEquals(3, dataTable.size()); } } diff --git a/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java b/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java index 1a9830b1b..59a13f6e4 100644 --- a/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java +++ b/baremaps-data/src/main/java/org/apache/baremaps/data/collection/AppendOnlyLog.java @@ -65,8 +65,8 @@ public class AppendOnlyLog<E> implements DataCollection<E> { this.dataType = dataType; this.memory = memory; this.segmentSize = memory.segmentSize(); - this.offset = Long.BYTES; - this.size = memory.segment(0).getLong(0); + this.size = memory.header().getLong(0); + this.offset = 0; } /** @@ -148,6 +148,7 @@ public class AppendOnlyLog<E> implements DataCollection<E> { @Override public void close() throws IOException { + memory.header().putLong(0, size); memory.close(); } @@ -166,7 +167,7 @@ public class AppendOnlyLog<E> implements DataCollection<E> { private AppendOnlyLogIterator(long size) { this.size = size; index = 0; - position = Long.BYTES; + position = 0; } @Override diff --git a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java index d0a2580bf..a8f159cc6 100644 --- a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java +++ b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/Memory.java @@ -28,28 +28,42 @@ import java.util.List; /** A base class to manage segments of on-heap, off-heap, or on-disk memory. */ public abstract class Memory<T extends ByteBuffer> implements Closeable { + private final int headerSize; + private final int segmentSize; private final long segmentShift; private final long segmentMask; - protected final List<T> segments = new ArrayList<>(); + protected T header; + + protected List<T> segments = new ArrayList<>(); /** * Constructs a memory with a given segment size. * * @param segmentSize the size of the segments */ - protected Memory(int segmentSize) { + protected Memory(int headerSize, int segmentSize) { if ((segmentSize & -segmentSize) != segmentSize) { throw new IllegalArgumentException("The segment size must be a power of 2"); } + this.headerSize = headerSize; this.segmentSize = segmentSize; this.segmentShift = (long) (Math.log(this.segmentSize) / Math.log(2)); this.segmentMask = this.segmentSize - 1l; } + /** + * Returns the size of the header. + * + * @return the size of the header + */ + public int headerSize() { + return headerSize; + } + /** * Returns the size of the segments. * @@ -77,6 +91,22 @@ public abstract class Memory<T extends ByteBuffer> implements Closeable { return segmentMask; } + public ByteBuffer header() { + if (header == null) { + synchronized (this) { + header = allocateHeader(); + } + } + return header; + } + + /** + * Allocates a header. + * + * @return the header + */ + protected abstract T allocateHeader(); + /** * Returns a segment of the memory. * @@ -85,47 +115,44 @@ public abstract class Memory<T extends ByteBuffer> implements Closeable { */ public ByteBuffer segment(int index) { if (segments.size() <= index) { - return allocate(index); + return allocateSegmentInternal(index); } ByteBuffer segment = segments.get(index); if (segment == null) { - return allocate(index); + return allocateSegmentInternal(index); } return segment; } + /** Returns the size of the allocated memory. */ + public long size() { + return (long) segments.size() * (long) segmentSize; + } + /** The allocation of segments is synchronized to enable access by multiple threads. */ - private synchronized ByteBuffer allocate(int index) { + private synchronized ByteBuffer allocateSegmentInternal(int index) { while (segments.size() <= index) { segments.add(null); } T segment = segments.get(index); if (segment == null) { - segment = allocate(index, segmentSize); + segment = allocateSegment(index); segments.set(index, segment); } return segment; } - /** Returns the size of the allocated memory. */ - public long size() { - return (long) segments.size() * (long) segmentSize; - } - /** * Allocates a segment for a given index and size. * * @param index the index of the segment - * @param size the size of the segment * @return the segment */ - protected abstract T allocate(int index, int size); + protected abstract T allocateSegment(int index); /** * Clears the memory and the underlying resources. */ public abstract void clear() throws IOException; - - } diff --git a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java index 82d2e0af7..1b9a18dfa 100644 --- a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java +++ b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedDirectory.java @@ -53,18 +53,31 @@ public class MemoryMappedDirectory extends Memory<MappedByteBuffer> { * @param segmentBytes the size of the segments in bytes */ public MemoryMappedDirectory(Path directory, int segmentBytes) { - super(segmentBytes); + super(1024, segmentBytes); this.directory = directory; } + @Override + protected MappedByteBuffer allocateHeader() { + try { + Path file = directory.resolve("header"); + try (FileChannel channel = FileChannel.open(file, StandardOpenOption.CREATE, + StandardOpenOption.READ, StandardOpenOption.WRITE)) { + return channel.map(MapMode.READ_WRITE, 0, 1024); + } + } catch (IOException e) { + throw new MemoryException(e); + } + } + /** {@inheritDoc} */ @Override - protected MappedByteBuffer allocate(int index, int size) { + protected MappedByteBuffer allocateSegment(int index) { try { Path file = directory.resolve(String.format("%s.part", index)); try (FileChannel channel = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - return channel.map(MapMode.READ_WRITE, 0, size); + return channel.map(MapMode.READ_WRITE, 0, segmentSize()); } } catch (IOException e) { throw new MemoryException(e); @@ -74,6 +87,7 @@ public class MemoryMappedDirectory extends Memory<MappedByteBuffer> { /** {@inheritDoc} */ @Override public void close() throws IOException { + MappedByteBufferUtils.unmap(header); for (MappedByteBuffer buffer : segments) { MappedByteBufferUtils.unmap(buffer); } @@ -83,6 +97,7 @@ public class MemoryMappedDirectory extends Memory<MappedByteBuffer> { @Override public void clear() throws IOException { close(); + header.clear(); segments.clear(); FileUtils.deleteRecursively(directory); } diff --git a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java index 5db613d34..7c2de6b8b 100644 --- a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java +++ b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/MemoryMappedFile.java @@ -49,17 +49,32 @@ public class MemoryMappedFile extends Memory<MappedByteBuffer> { * @param segmentBytes the size of the segments in bytes */ public MemoryMappedFile(Path file, int segmentBytes) { - super(segmentBytes); + super(1024, segmentBytes); this.file = file; } + @Override + protected MappedByteBuffer allocateHeader() { + try { + try (FileChannel channel = FileChannel.open(file, StandardOpenOption.CREATE, + StandardOpenOption.READ, StandardOpenOption.WRITE)) { + return channel.map(MapMode.READ_WRITE, 0, headerSize()); + } + } catch (IOException e) { + throw new MemoryException(e); + } + } + /** {@inheritDoc} */ @Override - protected MappedByteBuffer allocate(int index, int size) { + protected MappedByteBuffer allocateSegment(int index) { try { try (FileChannel channel = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - return channel.map(MapMode.READ_WRITE, (long) index * size, size); + int headerSize = headerSize(); + int segmentSize = segmentSize(); + int segmentPosition = headerSize + index * segmentSize; + return channel.map(MapMode.READ_WRITE, segmentPosition, segmentSize); } } catch (IOException e) { throw new MemoryException(e); diff --git a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java index f95f03b51..8294b68be 100644 --- a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java +++ b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OffHeapMemory.java @@ -36,13 +36,19 @@ public class OffHeapMemory extends Memory<ByteBuffer> { * @param segmentSize the size of the segments in bytes */ public OffHeapMemory(int segmentSize) { - super(segmentSize); + super(1024, segmentSize); } /** {@inheritDoc} */ @Override - protected ByteBuffer allocate(int index, int size) { - return ByteBuffer.allocateDirect(size); + protected ByteBuffer allocateHeader() { + return ByteBuffer.allocateDirect(headerSize()); + } + + /** {@inheritDoc} */ + @Override + protected ByteBuffer allocateSegment(int index) { + return ByteBuffer.allocateDirect(segmentSize()); } /** {@inheritDoc} */ diff --git a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java index 4b0fff757..5d18bbe40 100644 --- a/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java +++ b/baremaps-data/src/main/java/org/apache/baremaps/data/memory/OnHeapMemory.java @@ -36,13 +36,19 @@ public class OnHeapMemory extends Memory<ByteBuffer> { * @param segmentSize the size of the segments in bytes */ public OnHeapMemory(int segmentSize) { - super(segmentSize); + super(1024, segmentSize); } /** {@inheritDoc} */ @Override - protected ByteBuffer allocate(int index, int size) { - return ByteBuffer.allocate(size); + protected ByteBuffer allocateHeader() { + return ByteBuffer.allocate(headerSize()); + } + + /** {@inheritDoc} */ + @Override + protected ByteBuffer allocateSegment(int index) { + return ByteBuffer.allocate(segmentSize()); } /** {@inheritDoc} */ diff --git a/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java b/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java index aef70f1e7..a2dcfecd8 100644 --- a/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java +++ b/baremaps-store/src/main/java/org/apache/baremaps/store/DataSchema.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -36,7 +38,7 @@ import org.apache.baremaps.store.DataColumn.ColumnType; /** * A {@link DataSchema} is a description of the structure of a row in a {@link DataTable}. */ -public interface DataSchema extends Serializable { +public interface DataSchema { /** * Returns the name of the schema. @@ -100,14 +102,19 @@ public interface DataSchema extends Serializable { return mapper; } - static DataSchema read(Path path) throws IOException { + static DataSchema read(ByteBuffer buffer) throws IOException { var mapper = configureObjectMapper(); - return mapper.readValue(path.toFile(), DataSchema.class); + int length = buffer.getInt(); + byte[] bytes = new byte[length]; + buffer.get(bytes); + return mapper.readValue(bytes, DataSchema.class); } - static void write(Path path, DataSchema schema) throws IOException { + static void write(ByteBuffer buffer, DataSchema schema) throws IOException { var mapper = configureObjectMapper(); - mapper.writerWithDefaultPrettyPrinter().writeValue(path.toFile(), schema); + var bytes = mapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(schema); + buffer.putInt(bytes.length); + buffer.put(bytes); } - + }
