*Hi Apache tsfile, issue: *https://github.com/apache/tsfile/issues/843
Based on issue #843 and the follow-up discussion, I drafted an
implementation plan for the Spark TsFile table model connector.
1. Goal
Add a Spark connector for the TsFile table model. The proposed format name
is tsfile, so users can read and write table-model TsFile data through
Spark DataFrame and Spark SQL APIs.
For the first implementation, the connector will explicitly target:
-
*Spark 3.x DataSource V2 only*.
-
Spark V2 APIs such as TableProvider, ScanBuilder, and BatchWrite.
-
No Spark 2.4 DataSource V1 compatibility.
-
The old iotdb-extras spark-tsfile module may be used as a reference for
user experience and tests, but its code should not be copied directly.
The connector should reuse existing TsFile Java APIs as much as possible:
-
Read path: DeviceTableModelReader, TsFileSequenceReader#getTableSchemaMap,
TableQueryExecutor, etc.
-
Write path: TsFileWriter#registerTableSchema, TsFileWriter#writeTable.
-
The Spark connector should not reimplement TsFile binary parsing,
metadata parsing, page/chunk decoding, or writing logic.
2. Feature Definition
One Spark row maps to one TsFile table-model row:
row := (time, tag_1, tag_2, ..., tag_n, field_1, ..., field_m)
Component Meaning
table Logical table name in the TsFile table model. It corresponds to
TableSchema#getTableName() and is specified through an option. By default,
it is not exposed as a DataFrame column.
time Time column. It corresponds to the TsFile timestamp and is not a
normal TableSchema measurement column. The default DataFrame column name is
time.
TAG column Device identity column. It corresponds to ColumnCategory.TAG.
Multiple TAG columns form the device identity in order. The first
implementation only supports non-null StringType TAG columns.
FIELD column Measurement/value column. It corresponds to
ColumnCategory.FIELD. Sparse values and nulls are allowed.
ATTRIBUTE column ColumnCategory.ATTRIBUTE. Not supported in the first
implementation; the connector should fail with a clear error if encountered.
TIME category ColumnCategory.TIME. Not used as the DataFrame time column in
the first implementation; the connector should fail with a clear error if
encountered.
In the current TsFile table model, table and column names are normalized to
lower case. Therefore, schema lookup and compatibility checks inside the
connector should use lower-case names.
3. Initial Scope
The first implementation supports:
-
Reading a single TsFile file.
-
Reading a TsFile directory or glob path.
-
Reading multiple TsFiles with compatible schemas.
-
Inferring Spark schema from TsFile metadata.
-
Selecting a specific table.
-
Column pruning.
-
Predicate pushdown:
-
time range filters.
-
time = c.
-
TAG equality filters, for example city = 'beijing'.
-
AND combinations of the supported predicates above.
-
Returning null values correctly for sparse FIELD columns.
-
Writing table-model TsFile data from a Spark DataFrame.
-
Configuring table name, time column, TAG columns, FIELD columns,
encoding, and compression.
-
Providing Scala DataFrame and Spark SQL examples.
The first implementation does not support:
-
Tree-model Spark connector behavior.
-
Spark 2.4 DataSource V1.
-
Spark structured streaming.
-
Update/delete/merge-on-write.
-
Reading multiple tables into one DataFrame.
-
FIELD value predicate pushdown.
-
Complex OR predicate pushdown.
-
Non-string TAG values or null TAG values.
-
ATTRIBUTE, TIME, VECTOR, UNKNOWN, or arbitrary OBJECT values.
4. User SyntaxDataFrame Read
val df = spark.read
.format("tsfile")
.option("model", "table")
.option("table", "weather")
.load("/data/weather-tsfile")
df.select("time", "city", "device", "temperature")
.where("time >= 1704067200000 and city = 'beijing'")
.show()
Spark SQL Read
This is Spark SQL over the Spark connector. TsFile does not need to
implement its own SQL parser or SQL engine.
CREATE TEMPORARY VIEW weather
USING tsfile
OPTIONS (
path '/data/weather-tsfile',
model 'table',
table 'weather'
);
SELECT time, city, device, temperature
FROM weather
WHERE city = 'beijing';
DataFrame Write
df.write
.format("tsfile")
.option("model", "table")
.option("table", "weather")
.option("timeColumn", "time")
.option("tagColumns", "city,device")
.option("fieldColumns", "temperature,humidity,status")
.option("compression", "LZ4")
.mode("append")
.save("/data/weather-tsfile")
5. Main Options
Option Meaning
model Fixed to table.
table Table name. Required for writing; required when reading files
containing multiple tables.
timeColumn DataFrame column used as the timestamp column. Default is time.
tagColumns Required for writing. Comma-separated list. The order defines
the device identity.
fieldColumns Optional for writing. By default, all non-time and non-TAG
columns are FIELD columns.
timestampAs Read the time column as long or timestamp. Default is long.
timestampPrecision ms, us, or ns. Used when converting between Spark
TimestampType and raw TsFile timestamps.
mergeSchema Whether to merge FIELD schemas when reading multiple files.
Default is false.
pushdown Whether to enable predicate pushdown. Default is true.
compression Compression type for writing, for example LZ4, SNAPPY, GZIP, or
ZSTD.
encoding Encoding type for writing. It must be validated against TsFile
encoding/type compatibility rules.
nullTagPolicy The first implementation only supports error, meaning null
TAG values fail immediately.6. Data Type Support
TsFile type Spark type Support
BOOLEAN BooleanType FIELD supported
INT32 IntegerType FIELD supported
INT64 LongType FIELD supported
FLOAT FloatType FIELD supported
DOUBLE DoubleType FIELD supported
TEXT StringType FIELD supported
STRING StringType FIELD/TAG supported; TAG must use this type
DATE DateType or IntegerType FIELD supported
TIMESTAMP TimestampType or LongType FIELD supported
BLOB BinaryType FIELD supported
VECTOR none Not supported
UNKNOWN none Not supported
OBJECT TBD Not supported in the first implementation
TAG columns are strictly limited to non-null StringType in the first
implementation. The reason is that the current table model builds device
identity from the table name and TAG values. Tablet#getDeviceID generates
device identifiers from TAG values, and the TAG filter API also uses
string-based semantics. To avoid unstable identity and inconsistent filter
semantics, the first implementation rejects non-string TAG values and null
TAG values.
7. Read Semantics
When reading, the connector should:
-
Support a single file, a directory, or a glob path.
-
Use TsFileSequenceReader#getTableSchemaMap to read table metadata.
-
If a file contains only one table, infer the table automatically.
-
If a file contains multiple tables, require the user to specify
option("table",
"...").
-
By default, require strict schema compatibility across multiple files:
table name, column name, ColumnCategory, and TSDataType must match.
-
When mergeSchema=true, only allow FIELD column union. TAG columns must
still have the same order and type.
-
Fail clearly when encountering ATTRIBUTE, TIME, non-string TAG columns,
or unsupported types.
-
Return Spark SQL null for missing FIELD values.
Column Pruning Risk
The current table reader may depend on at least one FIELD/measurement
column when generating rows. Therefore, a query such as:
df.select("time", "city")
may require special handling:
-
Either improve the reader to support time/TAG-only projection.
-
Or let the connector internally read one minimal FIELD column and drop
it before returning Spark rows.
This should be covered by an integration test.
8. Predicate Pushdown
The first implementation guarantees the following pushdown support:
Spark predicate TsFile pushdown
time = c time filter
time > c, time >= c time filter
time < c, time <= c time filter
time BETWEEN a AND b time filter
tag = 'x' TAG equality filter
AND combinations of supported predicates combined pushdown
The following predicates remain Spark residual filters:
-
OR.
-
Complex functions.
-
FIELD value predicates.
-
TAG !=, IN, range comparison, and null check.
-
Non-string TAG comparisons.
9. Write Semantics
Spark writes must follow these rules:
-
Each Spark task/partition writes an independent part-*.tsfile file.
-
Output files are committed through Spark's commit protocol.
-
Multiple tasks must not append concurrently to the same physical TsFile.
-
append means appending new TsFile files to the target directory, not
appending data into existing TsFile files.
Write flow:
1.
Read timeColumn.
2.
If it is Spark TimestampType, convert it to raw TsFile long using
timestampPrecision. If it is LongType, use it directly.
3.
Read TAG columns in the configured order.
4.
Build a Tablet containing TAG and FIELD columns.
5.
Write null FIELD values as null/sparse values.
6.
Call TsFileWriter#writeTable when maxRowsPerTablet is reached or when
the task finishes.
7.
Call TsFileWriter#registerTableSchema before writing.
8.
Reject non-string TAG values or null TAG values immediately.
10. Module and Components
A new module is proposed:
java/spark-tsfile
It should be registered under the Java aggregator in java/pom.xml.
Dependency principles:
-
Spark/Hadoop dependencies should only live in the java/spark-tsfile
submodule.
-
Mark Spark/Hadoop dependencies as provided where possible.
-
Avoid polluting the core TsFile library.
-
Avoid dependency convergence or duplicate-class issues in the root POM.
Main components:
Component Responsibility
TsFileTableProvider Spark DataSource V2 entry point. Implements
TableProvider and registers the short name tsfile.
TsFileTableOptions Parses and validates connector options.
TsFileTableSchemaInferer Infers Spark StructType from TsFile metadata.
TsFileTableTypeConverter Converts between TsFile TSDataType and Spark SQL
types.
TsFileTableScanBuilder Spark V2 ScanBuilder; handles column pruning and
pushdown.
TsFileTablePartitionReader Reads TsFile data and converts it to Spark rows.
TsFileTableFilterTranslator Converts Spark filters to TsFile time/TAG
filters and records residual filters.
TsFileTableWriteBuilder Spark V2 write builder; validates write schema.
TsFileTableBatchWrite Spark V2 BatchWrite; coordinates task writers and
commit.
TsFileTablePartitionWriter Converts Spark rows to Tablet and writes
independent part-*.tsfile files.11. Implementation Phases
1.
*Read path*
-
Add the java/spark-tsfile module.
-
Add Spark 3.x V2 dependencies. Keep Spark/Hadoop dependencies provided
where possible.
-
Implement TableProvider.
-
Implement option parsing.
-
Implement single-file, single-table schema inference.
-
Implement basic DataFrame read.
-
Reject unsupported schemas, such as non-string TAG columns, ATTRIBUTE,
and TIME.
2.
*Directory and multi-file read*
-
Support directories and glob paths.
-
Validate schema compatibility across multiple files.
-
Support strict mergeSchema=false mode.
-
Add tests for compatible and incompatible schemas.
3.
*Projection and pushdown*
-
Implement column pruning.
-
Handle the time/TAG-only projection risk.
-
Implement time range, time = c, and TAG equality pushdown.
-
Implement AND combinations of supported predicates.
-
Keep unsupported filters as residual filters.
4.
*Write path*
-
Implement DataFrame write.
-
Build and register TableSchema.
-
Implement Spark V2 BatchWrite.
-
Make each task write independent part-*.tsfile files.
-
Support FIELD null values.
-
Reject non-string TAG values and null TAG values.
-
Add Spark DataFrame -> TsFile -> Spark DataFrame round-trip tests.
5.
*Examples and documentation*
-
Add Scala DataFrame read/write examples.
-
Add Spark SQL read/CTAS examples.
-
Document options and type mapping.
-
Add a small table-model TsFile test fixture.
12. Test Plan
The tests should cover:
-
Option parsing:
-
Missing table fails when writing or when reading files containing
multiple tables.
-
Missing tagColumns fails.
-
Duplicate column names after lower-case normalization fail.
-
Invalid timestampPrecision fails.
-
Non-string TAG columns fail.
-
Type mapping:
-
Supported types map correctly.
-
Unsupported types fail with clear errors.
-
ATTRIBUTE / TIME categories fail with clear errors.
-
Filter translation:
-
Time comparisons.
-
TAG equality.
-
AND combinations of supported predicates.
-
Unsupported predicates remain residual filters.
-
Integration tests:
-
Read a single file with a single table.
-
Read a multi-table file with a specified table.
-
Read multiple files from a directory.
-
Sparse FIELD values return null.
-
Selected columns only.
-
Time/TAG-only projection.
-
Pushdown correctness.
-
Each Spark task writes an independent part-*.tsfile.
-
Round-trip write/read correctness.
13. Initial Implementation Decisions
The first implementation uses conservative defaults:
-
Timestamp values are exposed as raw LongType by default. Spark
TimestampType is only used when users explicitly set
timestampAs=timestamp.
-
mergeSchema=true only unions compatible FIELD columns. It does not
perform numeric widening; same-name columns with different TsFile types
fail fast.
-
The first implementation only supports global encoding and compression
options. Per-column encoding/compression can be added later if needed.
Feedback on Spark version compatibility, connector API semantics, and
implementation scope is welcome. Best, Bryan Yang(杨易达)