DaZuiZui commented on issue #843:
URL: https://github.com/apache/tsfile/issues/843#issuecomment-4714317540

   
   ## 1. Goal
   
   Add a Spark connector for the TsFile table model. The proposed format name 
is `tsfile`, so users can read and write table-model TsFile data through Spark 
DataFrame and Spark SQL APIs.
   
   For the first implementation, the connector will explicitly target:
   
   - **Spark 3.x DataSource V2 only**.
   - Spark V2 APIs such as `TableProvider`, `ScanBuilder`, and `BatchWrite`.
   - No Spark 2.4 DataSource V1 compatibility.
   - The old `iotdb-extras spark-tsfile` module may be used as a reference for 
user experience and tests, but its code should not be copied directly.
   
   The connector should reuse existing TsFile Java APIs as much as possible:
   
   - Read path: `DeviceTableModelReader`, 
`TsFileSequenceReader#getTableSchemaMap`, `TableQueryExecutor`, etc.
   - Write path: `TsFileWriter#registerTableSchema`, `TsFileWriter#writeTable`.
   - The Spark connector should not reimplement TsFile binary parsing, metadata 
parsing, page/chunk decoding, or writing logic.
   
   ## 2. Feature Definition
   
   One Spark row maps to one TsFile table-model row:
   
   ```text
   row := (time, tag_1, tag_2, ..., tag_n, field_1, ..., field_m)
   ```
   
   | Component          | Meaning                                               
       |
   | ------------------ | 
------------------------------------------------------------ |
   | `table`            | Logical table name in the TsFile table model. It 
corresponds to `TableSchema#getTableName()` and is specified through an option. 
By default, it is not exposed as a DataFrame column. |
   | `time`             | Time column. It corresponds to the TsFile timestamp 
and is not a normal `TableSchema` measurement column. The default DataFrame 
column name is `time`. |
   | `TAG` column       | Device identity column. It corresponds to 
`ColumnCategory.TAG`. Multiple TAG columns form the device identity in order. 
The first implementation only supports non-null `StringType` TAG columns. |
   | `FIELD` column     | Measurement/value column. It corresponds to 
`ColumnCategory.FIELD`. Sparse values and nulls are allowed. |
   | `ATTRIBUTE` column | `ColumnCategory.ATTRIBUTE`. Not supported in the 
first implementation; the connector should fail with a clear error if 
encountered. |
   | `TIME` category    | `ColumnCategory.TIME`. Not used as the DataFrame time 
column in the first implementation; the connector should fail with a clear 
error if encountered. |
   
   In the current TsFile table model, table and column names are normalized to 
lower case. Therefore, schema lookup and compatibility checks inside the 
connector should use lower-case names.
   
   ## 3. Initial Scope
   
   The first implementation supports:
   
   - Reading a single TsFile file.
   - Reading a TsFile directory or glob path.
   - Reading multiple TsFiles with compatible schemas.
   - Inferring Spark schema from TsFile metadata.
   - Selecting a specific table.
   - Column pruning.
   - Predicate pushdown:
     - `time` range filters.
     - `time = c`.
     - TAG equality filters, for example `city = 'beijing'`.
     - `AND` combinations of the supported predicates above.
   - Returning null values correctly for sparse FIELD columns.
   - Writing table-model TsFile data from a Spark DataFrame.
   - Configuring table name, time column, TAG columns, FIELD columns, encoding, 
and compression.
   - Providing Scala DataFrame and Spark SQL examples.
   
   The first implementation does not support:
   
   - Tree-model Spark connector behavior.
   - Spark 2.4 DataSource V1.
   - Spark structured streaming.
   - Update/delete/merge-on-write.
   - Reading multiple tables into one DataFrame.
   - FIELD value predicate pushdown.
   - Complex `OR` predicate pushdown.
   - Non-string TAG values or null TAG values.
   - `ATTRIBUTE`, `TIME`, `VECTOR`, `UNKNOWN`, or arbitrary `OBJECT` values.
   
   ## 4. User Syntax
   
   ### DataFrame Read
   
   ```scala
   val df = spark.read
     .format("tsfile")
     .option("model", "table")
     .option("table", "weather")
     .load("/data/weather-tsfile")
   
   df.select("time", "city", "device", "temperature")
     .where("time >= 1704067200000 and city = 'beijing'")
     .show()
   ```
   
   ### Spark SQL Read
   
   This is Spark SQL over the Spark connector. TsFile does not need to 
implement its own SQL parser or SQL engine.
   
   ```sql
   CREATE TEMPORARY VIEW weather
   USING tsfile
   OPTIONS (
     path '/data/weather-tsfile',
     model 'table',
     table 'weather'
   );
   
   SELECT time, city, device, temperature
   FROM weather
   WHERE city = 'beijing';
   ```
   
   ### DataFrame Write
   
   ```scala
   df.write
     .format("tsfile")
     .option("model", "table")
     .option("table", "weather")
     .option("timeColumn", "time")
     .option("tagColumns", "city,device")
     .option("fieldColumns", "temperature,humidity,status")
     .option("compression", "LZ4")
     .mode("append")
     .save("/data/weather-tsfile")
   ```
   
   ## 5. Main Options
   
   | Option               | Meaning                                             
         |
   | -------------------- | 
------------------------------------------------------------ |
   | `model`              | Fixed to `table`.                                   
         |
   | `table`              | Table name. Required for writing; required when 
reading files containing multiple tables. |
   | `timeColumn`         | DataFrame column used as the timestamp column. 
Default is `time`. |
   | `tagColumns`         | Required for writing. Comma-separated list. The 
order defines the device identity. |
   | `fieldColumns`       | Optional for writing. By default, all non-time and 
non-TAG columns are FIELD columns. |
   | `timestampAs`        | Read the time column as `long` or `timestamp`. 
Default is `long`. |
   | `timestampPrecision` | `ms`, `us`, or `ns`. Used when converting between 
Spark `TimestampType` and raw TsFile timestamps. |
   | `mergeSchema`        | Whether to merge FIELD schemas when reading 
multiple files. Default is false. |
   | `pushdown`           | Whether to enable predicate pushdown. Default is 
true.       |
   | `compression`        | Compression type for writing, for example `LZ4`, 
`SNAPPY`, `GZIP`, or `ZSTD`. |
   | `encoding`           | Encoding type for writing. It must be validated 
against TsFile encoding/type compatibility rules. |
   | `nullTagPolicy`      | The first implementation only supports `error`, 
meaning null TAG values fail immediately. |
   
   ## 6. Data Type Support
   
   | TsFile type | Spark type                    | Support                      
               |
   | ----------- | ----------------------------- | 
------------------------------------------- |
   | `BOOLEAN`   | `BooleanType`                 | FIELD supported              
               |
   | `INT32`     | `IntegerType`                 | FIELD supported              
               |
   | `INT64`     | `LongType`                    | FIELD supported              
               |
   | `FLOAT`     | `FloatType`                   | FIELD supported              
               |
   | `DOUBLE`    | `DoubleType`                  | FIELD supported              
               |
   | `TEXT`      | `StringType`                  | FIELD supported              
               |
   | `STRING`    | `StringType`                  | FIELD/TAG supported; TAG 
must use this type |
   | `DATE`      | `DateType` or `IntegerType`   | FIELD supported              
               |
   | `TIMESTAMP` | `TimestampType` or `LongType` | FIELD supported              
               |
   | `BLOB`      | `BinaryType`                  | FIELD supported              
               |
   | `VECTOR`    | none                          | Not supported                
               |
   | `UNKNOWN`   | none                          | Not supported                
               |
   | `OBJECT`    | TBD                           | Not supported in the first 
implementation   |
   
   TAG columns are strictly limited to non-null `StringType` in the first 
implementation. The reason is that the current table model builds device 
identity from the table name and TAG values. `Tablet#getDeviceID` generates 
device identifiers from TAG values, and the TAG filter API also uses 
string-based semantics. To avoid unstable identity and inconsistent filter 
semantics, the first implementation rejects non-string TAG values and null TAG 
values.
   
   ## 7. Read Semantics
   
   When reading, the connector should:
   
   - Support a single file, a directory, or a glob path.
   - Use `TsFileSequenceReader#getTableSchemaMap` to read table metadata.
   - If a file contains only one table, infer the table automatically.
   - If a file contains multiple tables, require the user to specify 
`option("table", "...")`.
   - By default, require strict schema compatibility across multiple files: 
table name, column name, `ColumnCategory`, and `TSDataType` must match.
   - When `mergeSchema=true`, only allow FIELD column union. TAG columns must 
still have the same order and type.
   - Fail clearly when encountering `ATTRIBUTE`, `TIME`, non-string TAG 
columns, or unsupported types.
   - Return Spark SQL null for missing FIELD values.
   
   ### Column Pruning Risk
   
   The current table reader may depend on at least one FIELD/measurement column 
when generating rows. Therefore, a query such as:
   
   ```scala
   df.select("time", "city")
   ```
   
   may require special handling:
   
   - Either improve the reader to support time/TAG-only projection.
   - Or let the connector internally read one minimal FIELD column and drop it 
before returning Spark rows.
   
   This should be covered by an integration test.
   
   ## 8. Predicate Pushdown
   
   The first implementation guarantees the following pushdown support:
   
   | Spark predicate                            | TsFile pushdown     |
   | ------------------------------------------ | ------------------- |
   | `time = c`                                 | time filter         |
   | `time > c`, `time >= c`                    | time filter         |
   | `time < c`, `time <= c`                    | time filter         |
   | `time BETWEEN a AND b`                     | time filter         |
   | `tag = 'x'`                                | TAG equality filter |
   | `AND` combinations of supported predicates | combined pushdown   |
   
   The following predicates remain Spark residual filters:
   
   - `OR`.
   - Complex functions.
   - FIELD value predicates.
   - TAG `!=`, `IN`, range comparison, and null check.
   - Non-string TAG comparisons.
   
   ## 9. Write Semantics
   
   Spark writes must follow these rules:
   
   - Each Spark task/partition writes an independent `part-*.tsfile` file.
   - Output files are committed through Spark's commit protocol.
   - Multiple tasks must not append concurrently to the same physical TsFile.
   - `append` means appending new TsFile files to the target directory, not 
appending data into existing TsFile files.
   
   Write flow:
   
   1. Read `timeColumn`.
   2. If it is Spark `TimestampType`, convert it to raw TsFile long using 
`timestampPrecision`. If it is `LongType`, use it directly.
   3. Read TAG columns in the configured order.
   4. Build a `Tablet` containing TAG and FIELD columns.
   5. Write null FIELD values as null/sparse values.
   6. Call `TsFileWriter#writeTable` when `maxRowsPerTablet` is reached or when 
the task finishes.
   7. Call `TsFileWriter#registerTableSchema` before writing.
   8. Reject non-string TAG values or null TAG values immediately.
   
   ## 10. Module and Components
   
   A new module is proposed:
   
   ```text
   java/spark-tsfile
   ```
   
   It should be registered under the Java aggregator in `java/pom.xml`.
   
   Dependency principles:
   
   - Spark/Hadoop dependencies should only live in the `java/spark-tsfile` 
submodule.
   - Mark Spark/Hadoop dependencies as `provided` where possible.
   - Avoid polluting the core TsFile library.
   - Avoid dependency convergence or duplicate-class issues in the root POM.
   
   Main components:
   
   | Component                     | Responsibility                             
                  |
   | ----------------------------- | 
------------------------------------------------------------ |
   | `TsFileTableProvider`         | Spark DataSource V2 entry point. 
Implements `TableProvider` and registers the short name `tsfile`. |
   | `TsFileTableOptions`          | Parses and validates connector options.    
                  |
   | `TsFileTableSchemaInferer`    | Infers Spark `StructType` from TsFile 
metadata.              |
   | `TsFileTableTypeConverter`    | Converts between TsFile `TSDataType` and 
Spark SQL types.    |
   | `TsFileTableScanBuilder`      | Spark V2 `ScanBuilder`; handles column 
pruning and pushdown. |
   | `TsFileTablePartitionReader`  | Reads TsFile data and converts it to Spark 
rows.             |
   | `TsFileTableFilterTranslator` | Converts Spark filters to TsFile time/TAG 
filters and records residual filters. |
   | `TsFileTableWriteBuilder`     | Spark V2 write builder; validates write 
schema.              |
   | `TsFileTableBatchWrite`       | Spark V2 `BatchWrite`; coordinates task 
writers and commit.  |
   | `TsFileTablePartitionWriter`  | Converts Spark rows to `Tablet` and writes 
independent `part-*.tsfile` files. |
   
   ## 11. Implementation Phases
   
   1. **Read path**
      - Add the `java/spark-tsfile` module.
      - Add Spark 3.x V2 dependencies. Keep Spark/Hadoop dependencies 
`provided` where possible.
      - Implement `TableProvider`.
      - Implement option parsing.
      - Implement single-file, single-table schema inference.
      - Implement basic DataFrame read.
      - Reject unsupported schemas, such as non-string TAG columns, 
`ATTRIBUTE`, and `TIME`.
   
   2. **Directory and multi-file read**
      - Support directories and glob paths.
      - Validate schema compatibility across multiple files.
      - Support strict `mergeSchema=false` mode.
      - Add tests for compatible and incompatible schemas.
   
   3. **Projection and pushdown**
      - Implement column pruning.
      - Handle the time/TAG-only projection risk.
      - Implement time range, `time = c`, and TAG equality pushdown.
      - Implement `AND` combinations of supported predicates.
      - Keep unsupported filters as residual filters.
   
   4. **Write path**
      - Implement DataFrame write.
      - Build and register `TableSchema`.
      - Implement Spark V2 `BatchWrite`.
      - Make each task write independent `part-*.tsfile` files.
      - Support FIELD null values.
      - Reject non-string TAG values and null TAG values.
      - Add Spark DataFrame -> TsFile -> Spark DataFrame round-trip tests.
   
   5. **Examples and documentation**
      - Add Scala DataFrame read/write examples.
      - Add Spark SQL read/CTAS examples.
      - Document options and type mapping.
      - Add a small table-model TsFile test fixture.
   
   ## 12. Test Plan
   
   The tests should cover:
   
   - Option parsing:
     - Missing `table` fails.
     - Missing `tagColumns` fails.
     - Duplicate column names after lower-case normalization fail.
     - Invalid `timestampPrecision` fails.
     - Non-string TAG columns fail.
   - Type mapping:
     - Supported types map correctly.
     - Unsupported types fail with clear errors.
     - `ATTRIBUTE` / `TIME` categories fail with clear errors.
   - Filter translation:
     - Time comparisons.
     - TAG equality.
     - `AND` combinations of supported predicates.
     - Unsupported predicates remain residual filters.
   - Integration tests:
     - Read a single file with a single table.
     - Read a multi-table file with a specified table.
     - Read multiple files from a directory.
     - Sparse FIELD values return null.
     - Selected columns only.
     - Time/TAG-only projection.
     - Pushdown correctness.
     - Each Spark task writes an independent `part-*.tsfile`.
     - Round-trip write/read correctness.
   
   ## 13. Initial Implementation Decisions
   
   The first implementation uses conservative defaults:
   
   - Timestamp values are exposed as raw `LongType` by default. Spark 
`TimestampType` is only used when users explicitly set `timestampAs=timestamp`.
   - `mergeSchema=true` only unions compatible FIELD columns. It does not 
perform numeric widening; same-name columns with different TsFile types fail 
fast.
   - The first implementation only supports global `encoding` and `compression` 
options. Per-column encoding/compression can be added later if needed.
   
   Feedback on Spark version compatibility, connector API semantics, and 
implementation scope is welcome.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to