DaZuiZui commented on issue #843:
URL: https://github.com/apache/tsfile/issues/843#issuecomment-4714317540
## 1. Goal
Add a Spark connector for the TsFile table model. The proposed format name
is `tsfile`, so users can read and write table-model TsFile data through Spark
DataFrame and Spark SQL APIs.
For the first implementation, the connector will explicitly target:
- **Spark 3.x DataSource V2 only**.
- Spark V2 APIs such as `TableProvider`, `ScanBuilder`, and `BatchWrite`.
- No Spark 2.4 DataSource V1 compatibility.
- The old `iotdb-extras spark-tsfile` module may be used as a reference for
user experience and tests, but its code should not be copied directly.
The connector should reuse existing TsFile Java APIs as much as possible:
- Read path: `DeviceTableModelReader`,
`TsFileSequenceReader#getTableSchemaMap`, `TableQueryExecutor`, etc.
- Write path: `TsFileWriter#registerTableSchema`, `TsFileWriter#writeTable`.
- The Spark connector should not reimplement TsFile binary parsing, metadata
parsing, page/chunk decoding, or writing logic.
## 2. Feature Definition
One Spark row maps to one TsFile table-model row:
```text
row := (time, tag_1, tag_2, ..., tag_n, field_1, ..., field_m)
```
| Component | Meaning
|
| ------------------ |
------------------------------------------------------------ |
| `table` | Logical table name in the TsFile table model. It
corresponds to `TableSchema#getTableName()` and is specified through an option.
By default, it is not exposed as a DataFrame column. |
| `time` | Time column. It corresponds to the TsFile timestamp
and is not a normal `TableSchema` measurement column. The default DataFrame
column name is `time`. |
| `TAG` column | Device identity column. It corresponds to
`ColumnCategory.TAG`. Multiple TAG columns form the device identity in order.
The first implementation only supports non-null `StringType` TAG columns. |
| `FIELD` column | Measurement/value column. It corresponds to
`ColumnCategory.FIELD`. Sparse values and nulls are allowed. |
| `ATTRIBUTE` column | `ColumnCategory.ATTRIBUTE`. Not supported in the
first implementation; the connector should fail with a clear error if
encountered. |
| `TIME` category | `ColumnCategory.TIME`. Not used as the DataFrame time
column in the first implementation; the connector should fail with a clear
error if encountered. |
In the current TsFile table model, table and column names are normalized to
lower case. Therefore, schema lookup and compatibility checks inside the
connector should use lower-case names.
## 3. Initial Scope
The first implementation supports:
- Reading a single TsFile file.
- Reading a TsFile directory or glob path.
- Reading multiple TsFiles with compatible schemas.
- Inferring Spark schema from TsFile metadata.
- Selecting a specific table.
- Column pruning.
- Predicate pushdown:
- `time` range filters.
- `time = c`.
- TAG equality filters, for example `city = 'beijing'`.
- `AND` combinations of the supported predicates above.
- Returning null values correctly for sparse FIELD columns.
- Writing table-model TsFile data from a Spark DataFrame.
- Configuring table name, time column, TAG columns, FIELD columns, encoding,
and compression.
- Providing Scala DataFrame and Spark SQL examples.
The first implementation does not support:
- Tree-model Spark connector behavior.
- Spark 2.4 DataSource V1.
- Spark structured streaming.
- Update/delete/merge-on-write.
- Reading multiple tables into one DataFrame.
- FIELD value predicate pushdown.
- Complex `OR` predicate pushdown.
- Non-string TAG values or null TAG values.
- `ATTRIBUTE`, `TIME`, `VECTOR`, `UNKNOWN`, or arbitrary `OBJECT` values.
## 4. User Syntax
### DataFrame Read
```scala
val df = spark.read
.format("tsfile")
.option("model", "table")
.option("table", "weather")
.load("/data/weather-tsfile")
df.select("time", "city", "device", "temperature")
.where("time >= 1704067200000 and city = 'beijing'")
.show()
```
### Spark SQL Read
This is Spark SQL over the Spark connector. TsFile does not need to
implement its own SQL parser or SQL engine.
```sql
CREATE TEMPORARY VIEW weather
USING tsfile
OPTIONS (
path '/data/weather-tsfile',
model 'table',
table 'weather'
);
SELECT time, city, device, temperature
FROM weather
WHERE city = 'beijing';
```
### DataFrame Write
```scala
df.write
.format("tsfile")
.option("model", "table")
.option("table", "weather")
.option("timeColumn", "time")
.option("tagColumns", "city,device")
.option("fieldColumns", "temperature,humidity,status")
.option("compression", "LZ4")
.mode("append")
.save("/data/weather-tsfile")
```
## 5. Main Options
| Option | Meaning
|
| -------------------- |
------------------------------------------------------------ |
| `model` | Fixed to `table`.
|
| `table` | Table name. Required for writing; required when
reading files containing multiple tables. |
| `timeColumn` | DataFrame column used as the timestamp column.
Default is `time`. |
| `tagColumns` | Required for writing. Comma-separated list. The
order defines the device identity. |
| `fieldColumns` | Optional for writing. By default, all non-time and
non-TAG columns are FIELD columns. |
| `timestampAs` | Read the time column as `long` or `timestamp`.
Default is `long`. |
| `timestampPrecision` | `ms`, `us`, or `ns`. Used when converting between
Spark `TimestampType` and raw TsFile timestamps. |
| `mergeSchema` | Whether to merge FIELD schemas when reading
multiple files. Default is false. |
| `pushdown` | Whether to enable predicate pushdown. Default is
true. |
| `compression` | Compression type for writing, for example `LZ4`,
`SNAPPY`, `GZIP`, or `ZSTD`. |
| `encoding` | Encoding type for writing. It must be validated
against TsFile encoding/type compatibility rules. |
| `nullTagPolicy` | The first implementation only supports `error`,
meaning null TAG values fail immediately. |
## 6. Data Type Support
| TsFile type | Spark type | Support
|
| ----------- | ----------------------------- |
------------------------------------------- |
| `BOOLEAN` | `BooleanType` | FIELD supported
|
| `INT32` | `IntegerType` | FIELD supported
|
| `INT64` | `LongType` | FIELD supported
|
| `FLOAT` | `FloatType` | FIELD supported
|
| `DOUBLE` | `DoubleType` | FIELD supported
|
| `TEXT` | `StringType` | FIELD supported
|
| `STRING` | `StringType` | FIELD/TAG supported; TAG
must use this type |
| `DATE` | `DateType` or `IntegerType` | FIELD supported
|
| `TIMESTAMP` | `TimestampType` or `LongType` | FIELD supported
|
| `BLOB` | `BinaryType` | FIELD supported
|
| `VECTOR` | none | Not supported
|
| `UNKNOWN` | none | Not supported
|
| `OBJECT` | TBD | Not supported in the first
implementation |
TAG columns are strictly limited to non-null `StringType` in the first
implementation. The reason is that the current table model builds device
identity from the table name and TAG values. `Tablet#getDeviceID` generates
device identifiers from TAG values, and the TAG filter API also uses
string-based semantics. To avoid unstable identity and inconsistent filter
semantics, the first implementation rejects non-string TAG values and null TAG
values.
## 7. Read Semantics
When reading, the connector should:
- Support a single file, a directory, or a glob path.
- Use `TsFileSequenceReader#getTableSchemaMap` to read table metadata.
- If a file contains only one table, infer the table automatically.
- If a file contains multiple tables, require the user to specify
`option("table", "...")`.
- By default, require strict schema compatibility across multiple files:
table name, column name, `ColumnCategory`, and `TSDataType` must match.
- When `mergeSchema=true`, only allow FIELD column union. TAG columns must
still have the same order and type.
- Fail clearly when encountering `ATTRIBUTE`, `TIME`, non-string TAG
columns, or unsupported types.
- Return Spark SQL null for missing FIELD values.
### Column Pruning Risk
The current table reader may depend on at least one FIELD/measurement column
when generating rows. Therefore, a query such as:
```scala
df.select("time", "city")
```
may require special handling:
- Either improve the reader to support time/TAG-only projection.
- Or let the connector internally read one minimal FIELD column and drop it
before returning Spark rows.
This should be covered by an integration test.
## 8. Predicate Pushdown
The first implementation guarantees the following pushdown support:
| Spark predicate | TsFile pushdown |
| ------------------------------------------ | ------------------- |
| `time = c` | time filter |
| `time > c`, `time >= c` | time filter |
| `time < c`, `time <= c` | time filter |
| `time BETWEEN a AND b` | time filter |
| `tag = 'x'` | TAG equality filter |
| `AND` combinations of supported predicates | combined pushdown |
The following predicates remain Spark residual filters:
- `OR`.
- Complex functions.
- FIELD value predicates.
- TAG `!=`, `IN`, range comparison, and null check.
- Non-string TAG comparisons.
## 9. Write Semantics
Spark writes must follow these rules:
- Each Spark task/partition writes an independent `part-*.tsfile` file.
- Output files are committed through Spark's commit protocol.
- Multiple tasks must not append concurrently to the same physical TsFile.
- `append` means appending new TsFile files to the target directory, not
appending data into existing TsFile files.
Write flow:
1. Read `timeColumn`.
2. If it is Spark `TimestampType`, convert it to raw TsFile long using
`timestampPrecision`. If it is `LongType`, use it directly.
3. Read TAG columns in the configured order.
4. Build a `Tablet` containing TAG and FIELD columns.
5. Write null FIELD values as null/sparse values.
6. Call `TsFileWriter#writeTable` when `maxRowsPerTablet` is reached or when
the task finishes.
7. Call `TsFileWriter#registerTableSchema` before writing.
8. Reject non-string TAG values or null TAG values immediately.
## 10. Module and Components
A new module is proposed:
```text
java/spark-tsfile
```
It should be registered under the Java aggregator in `java/pom.xml`.
Dependency principles:
- Spark/Hadoop dependencies should only live in the `java/spark-tsfile`
submodule.
- Mark Spark/Hadoop dependencies as `provided` where possible.
- Avoid polluting the core TsFile library.
- Avoid dependency convergence or duplicate-class issues in the root POM.
Main components:
| Component | Responsibility
|
| ----------------------------- |
------------------------------------------------------------ |
| `TsFileTableProvider` | Spark DataSource V2 entry point.
Implements `TableProvider` and registers the short name `tsfile`. |
| `TsFileTableOptions` | Parses and validates connector options.
|
| `TsFileTableSchemaInferer` | Infers Spark `StructType` from TsFile
metadata. |
| `TsFileTableTypeConverter` | Converts between TsFile `TSDataType` and
Spark SQL types. |
| `TsFileTableScanBuilder` | Spark V2 `ScanBuilder`; handles column
pruning and pushdown. |
| `TsFileTablePartitionReader` | Reads TsFile data and converts it to Spark
rows. |
| `TsFileTableFilterTranslator` | Converts Spark filters to TsFile time/TAG
filters and records residual filters. |
| `TsFileTableWriteBuilder` | Spark V2 write builder; validates write
schema. |
| `TsFileTableBatchWrite` | Spark V2 `BatchWrite`; coordinates task
writers and commit. |
| `TsFileTablePartitionWriter` | Converts Spark rows to `Tablet` and writes
independent `part-*.tsfile` files. |
## 11. Implementation Phases
1. **Read path**
- Add the `java/spark-tsfile` module.
- Add Spark 3.x V2 dependencies. Keep Spark/Hadoop dependencies
`provided` where possible.
- Implement `TableProvider`.
- Implement option parsing.
- Implement single-file, single-table schema inference.
- Implement basic DataFrame read.
- Reject unsupported schemas, such as non-string TAG columns,
`ATTRIBUTE`, and `TIME`.
2. **Directory and multi-file read**
- Support directories and glob paths.
- Validate schema compatibility across multiple files.
- Support strict `mergeSchema=false` mode.
- Add tests for compatible and incompatible schemas.
3. **Projection and pushdown**
- Implement column pruning.
- Handle the time/TAG-only projection risk.
- Implement time range, `time = c`, and TAG equality pushdown.
- Implement `AND` combinations of supported predicates.
- Keep unsupported filters as residual filters.
4. **Write path**
- Implement DataFrame write.
- Build and register `TableSchema`.
- Implement Spark V2 `BatchWrite`.
- Make each task write independent `part-*.tsfile` files.
- Support FIELD null values.
- Reject non-string TAG values and null TAG values.
- Add Spark DataFrame -> TsFile -> Spark DataFrame round-trip tests.
5. **Examples and documentation**
- Add Scala DataFrame read/write examples.
- Add Spark SQL read/CTAS examples.
- Document options and type mapping.
- Add a small table-model TsFile test fixture.
## 12. Test Plan
The tests should cover:
- Option parsing:
- Missing `table` fails.
- Missing `tagColumns` fails.
- Duplicate column names after lower-case normalization fail.
- Invalid `timestampPrecision` fails.
- Non-string TAG columns fail.
- Type mapping:
- Supported types map correctly.
- Unsupported types fail with clear errors.
- `ATTRIBUTE` / `TIME` categories fail with clear errors.
- Filter translation:
- Time comparisons.
- TAG equality.
- `AND` combinations of supported predicates.
- Unsupported predicates remain residual filters.
- Integration tests:
- Read a single file with a single table.
- Read a multi-table file with a specified table.
- Read multiple files from a directory.
- Sparse FIELD values return null.
- Selected columns only.
- Time/TAG-only projection.
- Pushdown correctness.
- Each Spark task writes an independent `part-*.tsfile`.
- Round-trip write/read correctness.
## 13. Initial Implementation Decisions
The first implementation uses conservative defaults:
- Timestamp values are exposed as raw `LongType` by default. Spark
`TimestampType` is only used when users explicitly set `timestampAs=timestamp`.
- `mergeSchema=true` only unions compatible FIELD columns. It does not
perform numeric widening; same-name columns with different TsFile types fail
fast.
- The first implementation only supports global `encoding` and `compression`
options. Per-column encoding/compression can be added later if needed.
Feedback on Spark version compatibility, connector API semantics, and
implementation scope is welcome.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]