This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
The following commit(s) were added to refs/heads/master by this push:
new 4785b88 IoTDB Table Spark Connector
4785b88 is described below
commit 4785b88c37447b54eb5f656bb0fb7fc069e1a864
Author: shuwenwei <[email protected]>
AuthorDate: Fri Mar 14 09:24:08 2025 +0800
IoTDB Table Spark Connector
---
connectors/hive-connector/pom.xml | 1 +
connectors/pom.xml | 1 +
.../iotdb-table-connector-3.3/pom.xml | 76 +++++++++
...org.apache.spark.sql.sources.DataSourceRegister | 18 +++
.../iotdb/spark/table/db/IoTDBTableProvider.scala | 28 ++++
.../iotdb-table-connector-3.4/pom.xml | 76 +++++++++
...org.apache.spark.sql.sources.DataSourceRegister | 18 +++
.../iotdb/spark/table/db/IoTDBTableProvider.scala | 28 ++++
.../iotdb-table-connector-3.5/pom.xml | 76 +++++++++
...org.apache.spark.sql.sources.DataSourceRegister | 18 +++
.../iotdb/spark/table/db/IoTDBTableProvider.scala | 28 ++++
connectors/spark-iotdb-table-connector/pom.xml | 152 ++++++++++++++++++
.../spark-iotdb-table-common/pom.xml | 118 ++++++++++++++
.../table/db/AbstractIoTDBTableProvider.scala | 45 ++++++
.../apache/iotdb/spark/table/db/IoTDBOptions.scala | 50 ++++++
.../apache/iotdb/spark/table/db/IoTDBTable.scala | 64 ++++++++
.../apache/iotdb/spark/table/db/IoTDBUtils.scala | 171 +++++++++++++++++++++
.../table/db/read/IoTDBExpressionSQLBuilder.scala | 161 +++++++++++++++++++
.../spark/table/db/read/IoTDBInputPartition.scala | 27 ++++
.../spark/table/db/read/IoTDBPartitionReader.scala | 76 +++++++++
.../db/read/IoTDBPartitionReaderFactory.scala | 32 ++++
.../iotdb/spark/table/db/read/IoTDBScan.scala | 57 +++++++
.../spark/table/db/read/IoTDBScanBuilder.scala | 99 ++++++++++++
.../spark/table/db/write/IoTDBDataWriter.scala | 131 ++++++++++++++++
.../iotdb/spark/table/db/write/IoTDBWrite.scala | 35 +++++
.../spark/table/db/write/IoTDBWriteBuilder.scala | 28 ++++
.../spark/table/db/write/IoTDBWriteFactory.scala | 32 ++++
.../apache/iotdb/spark/table/db/UtilsTest.scala | 36 +++++
.../db/read/PushDownPredicateSQLBuilderTest.scala | 76 +++++++++
examples/pom.xml | 1 +
examples/spark-table/README.md | 103 +++++++++++++
{connectors => examples/spark-table}/pom.xml | 66 ++++----
.../spark/table/SparkConnectorReadExample.scala | 44 ++++++
.../spark/table/SparkConnectorSQLExample.scala | 59 +++++++
.../spark/table/SparkConnectorWriteExample.scala | 51 ++++++
iotdb-collector/collector-core/pom.xml | 4 +
pom.xml | 10 ++
37 files changed, 2066 insertions(+), 30 deletions(-)
diff --git a/connectors/hive-connector/pom.xml
b/connectors/hive-connector/pom.xml
index c71ebcd..ba5442a 100644
--- a/connectors/hive-connector/pom.xml
+++ b/connectors/hive-connector/pom.xml
@@ -42,6 +42,7 @@
<groupId>org.apache.tsfile</groupId>
<artifactId>common</artifactId>
<version>${tsfile.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tsfile</groupId>
diff --git a/connectors/pom.xml b/connectors/pom.xml
index ad2c299..7742633 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -37,6 +37,7 @@
<module>hadoop</module>
<module>hive-connector</module>
<module>spark-iotdb-connector</module>
+ <module>spark-iotdb-table-connector</module>
<module>spark-tsfile</module>
<module>zeppelin-interpreter</module>
</modules>
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/pom.xml
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/pom.xml
new file mode 100644
index 0000000..2392d53
--- /dev/null
+++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-connector</artifactId>
+ <version>2.0.2-SNAPSHOT</version>
+ </parent>
+ <artifactId>spark-iotdb-table-connector-3.3</artifactId>
+ <name>IoTDB: Table Connector: Apache Spark3.3 (Scala 2.12)</name>
+ <properties>
+ <spark.version>3.3.0</spark.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..1f46523
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.iotdb.spark.table.db.IoTDBTableProvider
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
new file mode 100644
index 0000000..4c98589
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import org.apache.spark.sql.sources.DataSourceRegister
+
+
+class IoTDBTableProvider extends AbstractIoTDBTableProvider with
DataSourceRegister {
+
+ override def shortName(): String = "iotdb"
+}
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/pom.xml
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/pom.xml
new file mode 100644
index 0000000..4f546cb
--- /dev/null
+++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-connector</artifactId>
+ <version>2.0.2-SNAPSHOT</version>
+ </parent>
+ <artifactId>spark-iotdb-table-connector-3.4</artifactId>
+ <name>IoTDB: Table Connector: Apache Spark3.4 (Scala 2.12)</name>
+ <properties>
+ <spark.version>3.4.0</spark.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..1f46523
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.iotdb.spark.table.db.IoTDBTableProvider
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
new file mode 100644
index 0000000..4c98589
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import org.apache.spark.sql.sources.DataSourceRegister
+
+
+class IoTDBTableProvider extends AbstractIoTDBTableProvider with
DataSourceRegister {
+
+ override def shortName(): String = "iotdb"
+}
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/pom.xml
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/pom.xml
new file mode 100644
index 0000000..3ce7f1d
--- /dev/null
+++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-connector</artifactId>
+ <version>2.0.2-SNAPSHOT</version>
+ </parent>
+ <artifactId>spark-iotdb-table-connector-3.5</artifactId>
+ <name>IoTDB: Table Connector: Apache Spark3.5 (Scala 2.12)</name>
+ <properties>
+ <spark.version>3.5.0</spark.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..1f46523
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.iotdb.spark.table.db.IoTDBTableProvider
diff --git
a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
new file mode 100644
index 0000000..4c98589
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import org.apache.spark.sql.sources.DataSourceRegister
+
+
+class IoTDBTableProvider extends AbstractIoTDBTableProvider with
DataSourceRegister {
+
+ override def shortName(): String = "iotdb"
+}
diff --git a/connectors/spark-iotdb-table-connector/pom.xml
b/connectors/spark-iotdb-table-connector/pom.xml
new file mode 100644
index 0000000..7b7a4b1
--- /dev/null
+++ b/connectors/spark-iotdb-table-connector/pom.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>connectors</artifactId>
+ <version>2.0.2-SNAPSHOT</version>
+ </parent>
+ <artifactId>spark-iotdb-table-connector</artifactId>
+ <!-- NOTE: This module actually only contains the sources and the common
settings
+ These sources are compiled twice for different scala version in
sub-
+ modules.
+ -->
+ <packaging>pom</packaging>
+ <name>IoTDB: Table Connector: Apache Spark</name>
+ <modules>
+ <module>spark-iotdb-table-common</module>
+ <module>iotdb-table-connector-3.5</module>
+ <module>iotdb-table-connector-3.4</module>
+ <module>iotdb-table-connector-3.3</module>
+ </modules>
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <spark.version>3.5.0</spark.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql-api_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-common-utils_${scala.version}</artifactId>
+ <version>3.5.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.15.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tsfile</groupId>
+ <artifactId>common</artifactId>
+ <version>${tsfile.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${iotdb.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tsfile</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${tsfile.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>isession</artifactId>
+ <version>${iotdb.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <build>
+ <!--
+ Defining the settings in pluginManagement as we don't actually
want to run the plugins in this module
+ but want to have the plugins defined in the child modules to
inherit these settings.
+ -->
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <configuration>
+ <scalaVersion>${scala.library.version}</scalaVersion>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+
<finalName>spark-iotdb-table-connector_${spark.version}_${scala.version}-${project.version}</finalName>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.thrift</pattern>
+
<shadedPattern>shade.org.apache.thrift</shadedPattern>
+ </relocation>
+ </relocations>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/pom.xml
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/pom.xml
new file mode 100644
index 0000000..abe34db
--- /dev/null
+++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-connector</artifactId>
+ <version>2.0.2-SNAPSHOT</version>
+ </parent>
+ <artifactId>spark-iotdb-table-common</artifactId>
+ <name>IoTDB: Table Connector: Apache Spark Common</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql-api_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-unsafe_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-common-utils_${scala.version}</artifactId>
+ <version>3.5.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tsfile</groupId>
+ <artifactId>common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tsfile</groupId>
+ <artifactId>tsfile</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>isession</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalactic</groupId>
+ <artifactId>scalactic_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/AbstractIoTDBTableProvider.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/AbstractIoTDBTableProvider.scala
new file mode 100644
index 0000000..1434e7d
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/AbstractIoTDBTableProvider.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import org.apache.spark.sql.connector.catalog.{Identifier, Table,
TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+/**
+ * IoTDBTableProvider is a Spark DataSource V2 provider for IoTDB.
+ * It supports schema inference and table access.
+ */
+abstract class AbstractIoTDBTableProvider extends TableProvider {
+
+ override def inferSchema(caseInsensitiveStringMap:
CaseInsensitiveStringMap): StructType = {
+
IoTDBUtils.getSchema(IoTDBOptions.fromMap(caseInsensitiveStringMap.asCaseSensitiveMap().asScala.toMap))
+ }
+
+ override def getTable(structType: StructType, transforms: Array[Transform],
map: util.Map[String, String]): Table = {
+ val db = map.get(IoTDBOptions.IOTDB_DATABASE)
+ val table = map.get(IoTDBOptions.IOTDB_TABLE)
+ new IoTDBTable(Identifier.of(Array[String](db), table), structType,
IoTDBOptions.fromMap(map.asScala.toMap))
+ }
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBOptions.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBOptions.scala
new file mode 100644
index 0000000..e03fef3
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBOptions.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import scala.collection.JavaConverters.seqAsJavaListConverter
+
+class IoTDBOptions(
+ @transient private val properties: Map[String, String])
+ extends Serializable {
+
+ val urls = properties.getOrElse(IoTDBOptions.IOTDB_URLS,
sys.error(s"Option '${IoTDBOptions.IOTDB_URLS}' not
specified")).split(",").toList.asJava
+
+ val username = properties.getOrElse(IoTDBOptions.IOTDB_USERNAME, "root")
+
+ val password = properties.getOrElse(IoTDBOptions.IOTDB_PASSWORD, "root")
+
+ val database = properties.getOrElse(IoTDBOptions.IOTDB_DATABASE,
sys.error(s"Option '${IoTDBOptions.IOTDB_DATABASE}' not specified"))
+
+ val table = properties.getOrElse(IoTDBOptions.IOTDB_TABLE,
sys.error(s"Option '${IoTDBOptions.IOTDB_TABLE}' not specified"))
+
+}
+
+object IoTDBOptions {
+ val IOTDB_USERNAME = "iotdb.username"
+ val IOTDB_PASSWORD = "iotdb.password"
+ val IOTDB_URLS = "iotdb.urls"
+ val IOTDB_DATABASE = "iotdb.database"
+ val IOTDB_TABLE = "iotdb.table"
+
+ def fromMap(sparkMap: Map[String, String]): IoTDBOptions = {
+ new IoTDBOptions(sparkMap.map { case (k, v) => (k.toLowerCase, v) })
+ }
+}
\ No newline at end of file
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTable.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTable.scala
new file mode 100644
index 0000000..6139e0b
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTable.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import org.apache.iotdb.spark.table.db.read.IoTDBScanBuilder
+import org.apache.iotdb.spark.table.db.write.IoTDBWriteBuilder
+import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead,
SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+import scala.collection.JavaConverters.{mapAsScalaMapConverter,
setAsJavaSetConverter}
+import scala.language.implicitConversions
+
+/**
+ * Represents an IoTDB table in Spark, supporting read and write operations.
+ *
+ * @param identifier The unique identifier of the table.
+ * @param schema The schema of the table.
+ * @param iotdbOptions Configuration options for IoTDB.
+ */
+class IoTDBTable(identifier: Identifier, schema: StructType, iotdbOptions:
IoTDBOptions) extends Table with SupportsRead with SupportsWrite {
+
+ override def name(): String = identifier.toString
+
+ override def schema(): StructType = schema
+
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.BATCH_READ,
+ TableCapability.BATCH_WRITE,
+ TableCapability.ACCEPT_ANY_SCHEMA).asJava
+ }
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= new
IoTDBScanBuilder(IoTDBOptions.fromMap(options.asCaseSensitiveMap().asScala.toMap),
schema())
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ val incomingSchema = info.schema()
+ if (incomingSchema.fields.length > schema.fields.length) {
+ throw new IllegalArgumentException(
+ s"The incoming schema has more fields
(${incomingSchema.fields.length}) than the table schema
(${schema.fields.length})."
+ )
+ }
+ new IoTDBWriteBuilder(iotdbOptions, incomingSchema, schema)
+ }
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala
new file mode 100644
index 0000000..97ea02e
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import org.apache.iotdb.isession.SessionDataSet
+import org.apache.iotdb.session.TableSessionBuilder
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.tsfile.enums.TSDataType
+import org.apache.tsfile.read.common.RowRecord
+import org.apache.tsfile.utils.{Binary, DateUtils}
+import org.apache.tsfile.write.record.Tablet.ColumnCategory
+
+import java.util
+
+object IoTDBUtils {
+
+ val TIME = "time"
+ val COLUMN_CATEGORY = "category"
+
+ /**
+ * Retrieves the schema of an IoTDB table.
+ *
+ * @param options IoTDB options.
+ * @return The schema as a Spark `StructType`.
+ */
+ def getSchema(options: IoTDBOptions): StructType = {
+ val session = new TableSessionBuilder()
+ .username(options.username)
+ .password(options.password)
+ .nodeUrls(options.urls)
+ .database(options.database)
+ .build()
+ val structFields = new util.ArrayList[StructField]()
+ var dataSet: SessionDataSet = null
+ try {
+ dataSet = session.executeQueryStatement(s"DESC ${options.table}")
+ while (dataSet.hasNext) {
+ val row: RowRecord = dataSet.next()
+ val columnName = row.getField(0).getStringValue
+ val dataType = row.getField(1).getStringValue
+ val columnType = row.getField(2).getStringValue
+ structFields.add(StructField(columnName, getSparkDataType(dataType),
nullable = !TIME.equals(columnName), metadata = new
MetadataBuilder().putString(COLUMN_CATEGORY, columnType).build()))
+ }
+ } catch {
+ case e: Exception => throw SparkException.internalError(s"Failed to get
schema of table ${options.table}.", e)
+ } finally {
+ if (dataSet != null) {
+ dataSet.close()
+ }
+ session.close()
+ }
+ new StructType(structFields.toArray(Array[StructField]()))
+ }
+
+ private def getSparkDataType(iotdbDataTypeStr: String): DataType = {
+ iotdbDataTypeStr.toUpperCase match {
+ case "BOOLEAN" => BooleanType
+ case "INT32" => IntegerType
+ case "DATE" => DateType
+ case "INT64" => LongType
+ case "TIMESTAMP" => LongType
+ case "FLOAT" => FloatType
+ case "DOUBLE" => DoubleType
+ case "TEXT" => StringType
+ case "BLOB" => BinaryType
+ case "STRING" => StringType
+ case _ => StringType
+ }
+ }
+
+ def getSparkValue(sparkDataType: DataType, dataSetIterator:
SessionDataSet#DataIterator, columnIdx: Int): Any = {
+ sparkDataType match {
+ case BooleanType => dataSetIterator.getBoolean(columnIdx)
+ case IntegerType => dataSetIterator.getInt(columnIdx)
+ case DateType =>
DateTimeUtils.fromJavaDate(DateUtils.parseIntToDate(dataSetIterator.getInt(columnIdx)))
+ case LongType => dataSetIterator.getLong(columnIdx)
+ case FloatType => dataSetIterator.getFloat(columnIdx)
+ case DoubleType => dataSetIterator.getDouble(columnIdx)
+ case StringType =>
UTF8String.fromString(dataSetIterator.getString(columnIdx))
+ case BinaryType =>
getByteArrayFromHexString(dataSetIterator.getString(columnIdx))
+ case TimestampType => dataSetIterator.getLong(columnIdx)
+ }
+ }
+
+ private def getByteArrayFromHexString(value: String): Array[Byte] = {
+ if (value.isEmpty) {
+ new Array[Byte](0)
+ }
+ require(value.length % 2 == 0, "The length of the hex string must be
even.")
+ value.substring(2).sliding(2, 2).map(Integer.parseInt(_,
16).toByte).toArray
+ }
+
+ def getIoTDBHexStringFromByteArray(value: Array[Byte]): String = {
+ s"X'${value.map(b => f"$b%02X").mkString("")}'"
+ }
+
+ def getIoTDBDataType(sparkDataType: DataType): TSDataType = {
+ sparkDataType match {
+ case BooleanType => TSDataType.BOOLEAN
+ case ByteType => TSDataType.INT32
+ case ShortType => TSDataType.INT32
+ case IntegerType => TSDataType.INT32
+ case LongType => TSDataType.INT64
+ case FloatType => TSDataType.FLOAT
+ case DoubleType => TSDataType.DOUBLE
+ case StringType => TSDataType.STRING
+ case BinaryType => TSDataType.BLOB
+ case DateType => TSDataType.DATE
+ case TimestampType => TSDataType.STRING
+ case _ => TSDataType.STRING
+ }
+ }
+
+ def getIoTDBValue(sparkDataType: DataType, value: Any): Any = {
+ sparkDataType match {
+ case BooleanType => value.asInstanceOf[Boolean]
+ case ByteType => value.asInstanceOf[Byte].toInt
+ case ShortType => value.asInstanceOf[Short].toInt
+ case IntegerType => value.asInstanceOf[Int]
+ case LongType => value.asInstanceOf[Long]
+ case FloatType => value.asInstanceOf[Float]
+ case DoubleType => value.asInstanceOf[Double]
+ case StringType => value.asInstanceOf[UTF8String].toString
+ case BinaryType => new Binary(value.asInstanceOf[Array[Byte]])
+ case DateType =>
DateTimeUtils.toJavaDate(value.asInstanceOf[Integer]).toLocalDate
+ case TimestampType =>
DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long]).toString
+ case _ => value.toString
+ }
+ }
+
+ def getIoTDBColumnCategory(columnCategoryStr: String): ColumnCategory = {
+ columnCategoryStr.toUpperCase match {
+ case "TAG" => ColumnCategory.TAG
+ case "ATTRIBUTE" => ColumnCategory.ATTRIBUTE
+ case _ => ColumnCategory.FIELD
+ }
+ }
+
+ def getIoTDBColumnIdentifierInSQL(sparkColumnIdentifier: String,
isSparkNamedReference: Boolean): String = {
+ var str = sparkColumnIdentifier
+ if (isSparkNamedReference) {
+ str = sparkColumnIdentifier.replaceAll("``", "`")
+ if (str.startsWith("`") && str.endsWith("`")) {
+ str = str.substring(1, str.length - 1)
+ }
+ }
+ str = str.replaceAll("\"", "\"\"")
+ s""""$str""""
+ }
+
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBExpressionSQLBuilder.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBExpressionSQLBuilder.scala
new file mode 100644
index 0000000..554ae6f
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBExpressionSQLBuilder.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.read
+
+import org.apache.iotdb.spark.table.db.IoTDBUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or,
Predicate}
+import org.apache.spark.sql.connector.expressions.{Expression,
GeneralScalarExpression, Literal, NamedReference}
+import org.apache.spark.sql.types._
+
+class IoTDBExpressionSQLBuilder {
+
+ def build(predicate: Predicate): String = {
+ s"(${buildIoTDBExpressionSQL(predicate)})"
+ }
+
+ private def buildIoTDBExpressionSQL(expression: Expression): String = {
+ expression match {
+ case literal: Literal[_] => visitLiteral(literal)
+ case namedReference: NamedReference =>
visitNamedReference(namedReference)
+ case expr: GeneralScalarExpression => visitGeneralScalarExpression(expr)
+ case _ => throw new UnsupportedOperationException("Unsupported push down
expression: " + expression)
+ }
+ }
+
+ private def visitLiteral(literal: Literal[_]): String = {
+ literal.dataType() match {
+ case StringType => s"'${literal.value().toString.replace("'", "''")}'"
+ case BinaryType =>
IoTDBUtils.getIoTDBHexStringFromByteArray(literal.value().asInstanceOf[Array[Byte]])
+ case DateType =>
s"CAST('${DateTimeUtils.toJavaDate(Integer.parseInt(literal.value().toString))}'
as DATE)"
+ case ShortType | IntegerType | ByteType | LongType | BooleanType |
FloatType | DoubleType => literal.value().toString
+ case _ => throw new UnsupportedOperationException("Unsupported push down
literal type: " + literal.dataType())
+ }
+ }
+
+ private def visitNamedReference(namedRef: NamedReference): String = {
+ IoTDBUtils.getIoTDBColumnIdentifierInSQL(namedRef.toString, true)
+ }
+
+ private def visitAlwaysFalse(): String = {
+ "FALSE"
+ }
+
+ private def visitAlwaysTrue(): String = {
+ "TRUE"
+ }
+
+ private def visitOr(or: Or): String = {
+ s"(${buildIoTDBExpressionSQL(or.left())}) OR
(${buildIoTDBExpressionSQL(or.right())})"
+ }
+
+ private def visitAnd(and: And): String = {
+ s"(${buildIoTDBExpressionSQL(and.left())}) AND
(${buildIoTDBExpressionSQL(and.right())})"
+ }
+
+ private def visitNot(not: Not): String = {
+ s"NOT (${buildIoTDBExpressionSQL(not.child())})"
+ }
+
+ private def visitGeneralScalarExpression(expr: GeneralScalarExpression):
String = {
+ // <=> is unsupported
+ expr.name() match {
+ case "IS_NULL" => visitIsNull(expr)
+ case "IS_NOT_NULL" => visitIsNotNull(expr)
+ case "STARTS_WITH" => visitStartsWith(expr)
+ case "ENDS_WITH" => visitEndsWith(expr)
+ case "CONTAINS" => visitContains(expr)
+ case "IN" => visitIn(expr)
+ case "=" => visitEqualTo(expr)
+ case "<>" => visitNotEqualTo(expr)
+ case "<" => visitLess(expr)
+ case "<=" => visitLessOrEqual(expr)
+ case ">" => visitGreater(expr)
+ case ">=" => visitGreaterOrEqual(expr)
+ case "AND" => visitAnd(expr.asInstanceOf[And])
+ case "OR" => visitOr(expr.asInstanceOf[Or])
+ case "NOT" => visitNot(expr.asInstanceOf[Not])
+ case "ALWAYS_TRUE" => visitAlwaysTrue()
+ case "ALWAYS_FALSE" => visitAlwaysFalse()
+ case _ => throw new UnsupportedOperationException("Unsupported push down
expression: " + expr)
+ }
+ }
+
+ private def visitIsNull(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} IS NULL"
+ }
+
+ private def visitIsNotNull(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} IS NOT NULL"
+ }
+
+ private def visitStartsWith(expr: Expression): String = {
+ val leftExpr = buildIoTDBExpressionSQL(expr.children()(0))
+ val rightExpr = buildIoTDBExpressionSQL(expr.children()(1))
+ s"starts_with(${leftExpr}, ${rightExpr})"
+ }
+
+ private def visitEndsWith(expr: Expression): String = {
+ val leftExpr = buildIoTDBExpressionSQL(expr.children()(0))
+ val rightExpr = buildIoTDBExpressionSQL(expr.children()(1))
+ s"ends_with(${leftExpr}, ${rightExpr})"
+ }
+
+ private def visitContains(expr: Expression): String = {
+ if (expr.children()(1).isInstanceOf[NamedReference]) {
+ throw new UnsupportedOperationException("Unsupported push down
expression: contains non constant string")
+ }
+ val leftExpr = buildIoTDBExpressionSQL(expr.children()(0))
+ val rightExpr = buildIoTDBExpressionSQL(expr.children()(1))
+ s"$leftExpr LIKE '%${rightExpr.substring(1, rightExpr.length - 1)}%'"
+ }
+
+ private def visitIn(expr: Expression): String = {
+ val expressions = expr.children()
+ val leftExpr = buildIoTDBExpressionSQL(expressions(0))
+ val rightExpr = expressions.slice(1,
expressions.length).map(buildIoTDBExpressionSQL).mkString(",")
+ s"$leftExpr IN ($rightExpr)"
+ }
+
+ private def visitEqualTo(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} =
${buildIoTDBExpressionSQL(expr.children()(1))}"
+ }
+
+ private def visitNotEqualTo(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} !=
${buildIoTDBExpressionSQL(expr.children()(1))}"
+ }
+
+ private def visitLess(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} <
${buildIoTDBExpressionSQL(expr.children()(1))}"
+ }
+
+ private def visitLessOrEqual(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} <=
${buildIoTDBExpressionSQL(expr.children()(1))}"
+ }
+
+ private def visitGreater(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} >
${buildIoTDBExpressionSQL(expr.children()(1))}"
+ }
+
+ private def visitGreaterOrEqual(expr: Expression): String = {
+ s"${buildIoTDBExpressionSQL(expr.children()(0))} >=
${buildIoTDBExpressionSQL(expr.children()(1))}"
+ }
+
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBInputPartition.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBInputPartition.scala
new file mode 100644
index 0000000..4d45b9c
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBInputPartition.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.read
+
+import org.apache.spark.sql.connector.read.InputPartition
+
+class IoTDBInputPartition(sql: String) extends InputPartition {
+
+ def getSQL: String = sql
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReader.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReader.scala
new file mode 100644
index 0000000..4497586
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReader.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.read
+
+import org.apache.iotdb.isession.ITableSession
+import org.apache.iotdb.session.TableSessionBuilder
+import org.apache.iotdb.spark.table.db.{IoTDBOptions, IoTDBUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.types._
+
+/**
+ * IoTDBPartitionReader is responsible for reading data from IoTDB and
converting it into Spark's InternalRow format.
+ *
+ * @param inputPartition The partition containing query information.
+ * @param schema The schema of the resulting data.
+ * @param options IoTDB connection and query options.
+ */
+class IoTDBPartitionReader(inputPartition: InputPartition, schema: StructType,
options: IoTDBOptions) extends PartitionReader[InternalRow] with Logging {
+
+ private lazy val session: ITableSession = {
+ new TableSessionBuilder()
+ .username(options.username)
+ .password(options.password)
+ .nodeUrls(options.urls)
+ .database(options.database)
+ .build()
+ }
+
+ private lazy val dataSetIterator =
session.executeQueryStatement(inputPartition.asInstanceOf[IoTDBInputPartition].getSQL).iterator()
+
+ override def next(): Boolean = dataSetIterator.next()
+
+ override def get(): InternalRow = {
+ val row = new GenericInternalRow(schema.length)
+ for (i <- 0 until schema.length) {
+ if (dataSetIterator.isNull(i + 1)) {
+ row.setNullAt(i)
+ } else {
+ val dataType = schema.fields(i).dataType
+ row.update(i, IoTDBUtils.getSparkValue(dataType, dataSetIterator, i +
1))
+ }
+ }
+ row
+ }
+
+ override def close(): Unit = {
+ try {
+ if (session != null) {
+ session.close()
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error closing IoTDB session: ${e.getMessage}")
+ }
+ }
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReaderFactory.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReaderFactory.scala
new file mode 100644
index 0000000..46c2179
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReaderFactory.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.read
+
+import org.apache.iotdb.spark.table.db.IoTDBOptions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+
+class IoTDBPartitionReaderFactory(schema: StructType, options: IoTDBOptions)
extends PartitionReaderFactory{
+
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ new IoTDBPartitionReader(partition, schema, options)
+ }
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScan.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScan.scala
new file mode 100644
index 0000000..cd480d6
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScan.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.read
+
+import org.apache.iotdb.spark.table.db.{IoTDBOptions, IoTDBUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{Batch, InputPartition,
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.types.StructType
+
+import scala.language.postfixOps
+
+class IoTDBScan(options :IoTDBOptions, requiredSchema: StructType,
pushedFilters: Array[String], pushDownOffset: Int, pushDownLimit: Int) extends
Scan with Batch with Logging {
+
+ override def readSchema(): StructType = requiredSchema
+
+ override def toBatch: Batch = this
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ val sql = buildSQL()
+ logDebug(s"SQL: $sql")
+ Array(new IoTDBInputPartition(sql))
+ }
+
+ private def buildSQL(): String = {
+ val columnList = getColumns()
+ val sqlBuilder = new StringBuilder(s"SELECT $columnList FROM
${options.table}")
+
+ if (pushedFilters.nonEmpty) sqlBuilder.append(s" WHERE
${pushedFilters.mkString(" AND ")}")
+ if (pushDownOffset > 0) sqlBuilder.append(s" OFFSET $pushDownOffset")
+ if (pushDownLimit > 0) sqlBuilder.append(s" LIMIT $pushDownLimit")
+
+ sqlBuilder.toString()
+ }
+
+ private def getColumns(): String = {
+ requiredSchema.fieldNames.map(name =>
IoTDBUtils.getIoTDBColumnIdentifierInSQL(name, false)).mkString(", ")
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = new
IoTDBPartitionReaderFactory(requiredSchema, options)
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScanBuilder.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScanBuilder.scala
new file mode 100644
index 0000000..2c5d161
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScanBuilder.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.read
+
+import org.apache.iotdb.spark.table.db.IoTDBOptions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+/**
+ * IoTDBScanBuilder is responsible for constructing an IoTDBScan with
+ * support for predicate push-down, column pruning, offset, and limit.
+ *
+ * @param options The IoTDB connection and query options.
+ * @param schema The full schema of the table.
+ */
+class IoTDBScanBuilder(options: IoTDBOptions, schema: StructType) extends
ScanBuilder
+ with SupportsPushDownRequiredColumns
+ with SupportsPushDownV2Filters
+ with SupportsPushDownOffset
+ with SupportsPushDownLimit
+ with Logging {
+
+ private var supportedFilters: Array[Predicate] = Array.empty
+ private var pushDownFilterStrings: Array[String] = Array.empty
+ private var requiredColumns: StructType = schema
+ private var pushDownOffset: Int = -1
+ private var pushDownLimit: Int = -1
+
+ override def build(): Scan = {
+ new IoTDBScan(options, requiredColumns, pushDownFilterStrings,
pushDownOffset, pushDownLimit)
+ }
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ if (requiredSchema.nonEmpty) {
+ val fields = schema.fields.filter(
+ field => requiredSchema.fieldNames.contains(field.name)
+ )
+ requiredColumns = StructType(fields)
+ } else {
+ requiredColumns = schema
+ }
+ }
+
+ override def pushOffset(offset: Int): Boolean = {
+ pushDownOffset = offset
+ true
+ }
+
+ override def pushLimit(limit: Int): Boolean = {
+ pushDownLimit = limit
+ true
+ }
+
+ override def pushPredicates(predicates: Array[Predicate]): Array[Predicate]
= {
+ val compiledPredicates = new util.ArrayList[String]()
+ val builder = new IoTDBExpressionSQLBuilder
+ val (supported, unsupported) = predicates.partition(predicate => {
+ try {
+ val sql = builder.build(predicate)
+ compiledPredicates.add(sql)
+ true
+ } catch {
+ case e: Exception => {
+ logDebug(s"Predicate push-down failed for: $predicate, reason:
${e.getMessage}")
+ false
+ }
+ }
+ })
+ pushDownFilterStrings = compiledPredicates.toArray(new
Array[String](compiledPredicates.size()))
+ supportedFilters = supported
+ unsupported
+ }
+
+ override def pushedPredicates(): Array[Predicate] = {
+ supportedFilters
+ }
+
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala
new file mode 100644
index 0000000..91f921d
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.write
+
+import org.apache.iotdb.session.TableSessionBuilder
+import org.apache.iotdb.spark.table.db.{IoTDBOptions, IoTDBUtils}
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.apache.tsfile.enums.TSDataType
+import org.apache.tsfile.write.record.Tablet
+import org.apache.tsfile.write.record.Tablet.ColumnCategory
+
+class IoTDBDataWriter(options: IoTDBOptions, writeSchema: StructType,
tableSchema: StructType) extends DataWriter[InternalRow] with Logging {
+
+ private lazy val session =
+ new TableSessionBuilder()
+ .username(options.username)
+ .password(options.password)
+ .database(options.database)
+ .nodeUrls(options.urls)
+ .build()
+
+ private val tableSchemaMap = tableSchema.fields.map(f => f.name -> f).toMap
+
+ private val isWriteSchemaValid = writeSchema.fields.forall(f =>
tableSchemaMap.contains(f.name))
+
+ private lazy val tablet = {
+ val tableName = options.table
+ val columnNameList = new java.util.ArrayList[String]()
+ val dataTypeList = new java.util.ArrayList[TSDataType]()
+ val columnCategoryList = new java.util.ArrayList[ColumnCategory]()
+
+ for (i <- writeSchema.indices) {
+ val writeSchemaField = writeSchema.fields(i)
+ val fieldInTableSchema = if (isWriteSchemaValid) {
+ writeSchema.fields(i)
+ } else {
+ tableSchema.fields(i)
+ }
+ val columnCategoryStr =
tableSchemaMap.getOrElse(fieldInTableSchema.name,
tableSchema.fields(i)).metadata.getString(IoTDBUtils.COLUMN_CATEGORY)
+ val columnCategory = IoTDBUtils.getIoTDBColumnCategory(columnCategoryStr)
+ if (fieldInTableSchema.name != IoTDBUtils.TIME) {
+ val dataType = writeSchemaField.dataType
+ columnNameList.add(fieldInTableSchema.name)
+ dataTypeList.add(IoTDBUtils.getIoTDBDataType(dataType))
+ columnCategoryList.add(columnCategory)
+ }
+ }
+ new Tablet(tableName, columnNameList, dataTypeList, columnCategoryList)
+ }
+
+ override def write(record: InternalRow): Unit = {
+ if (tablet.getRowSize == tablet.getMaxRowNumber) {
+ writeTabletToIoTDB()
+ }
+ val currentRow = tablet.getRowSize
+ try {
+ for (i <- writeSchema.fields.indices) {
+ if (!record.isNullAt(i)) {
+ val column = if (isWriteSchemaValid) {
+ writeSchema.fields(i).name
+ } else {
+ tableSchema.fields(i).name
+ }
+ val dataType = writeSchema.fields(i).dataType
+ val value = IoTDBUtils.getIoTDBValue(dataType, record.get(i,
dataType))
+ if (column == IoTDBUtils.TIME) {
+ tablet.addTimestamp(currentRow, value.asInstanceOf[Long])
+ } else {
+ tablet.addValue(column, currentRow, value)
+ }
+ }
+ }
+ } catch {
+ case e: Exception =>
+ throw SparkException.internalError("Error writing data to Tablet", e)
+ }
+ }
+
+ override def commit(): WriterCommitMessage = {
+ if (tablet.getRowSize > 0) {
+ writeTabletToIoTDB()
+ }
+ new IoTDBWriterCommitMessage()
+ }
+
+ private def writeTabletToIoTDB(): Unit = {
+ try {
+ session.insert(tablet)
+ tablet.reset()
+ } catch {
+ case e: Exception =>
+ throw SparkException.internalError("Error writing tablet to IoTDB", e)
+ }
+ }
+
+ override def abort(): Unit = {}
+
+ override def close(): Unit = {
+ if (session != null) {
+ try {
+ session.close()
+ } catch {
+ case e: Exception =>
+ logError(s"Error closing IoTDB session: ${e.getMessage}")
+ }
+ }
+ }
+}
+
+class IoTDBWriterCommitMessage extends WriterCommitMessage {}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWrite.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWrite.scala
new file mode 100644
index 0000000..efa5f8b
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWrite.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.write
+
+import org.apache.iotdb.spark.table.db.IoTDBOptions
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.types.StructType
+
+class IoTDBWrite(options: IoTDBOptions, writeSchema: StructType, tableSchema:
StructType) extends Write with BatchWrite {
+
+ override def toBatch: BatchWrite = this
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = new IoTDBWriteFactory(options, writeSchema, tableSchema)
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteBuilder.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteBuilder.scala
new file mode 100644
index 0000000..4ea34dd
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteBuilder.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.write
+
+import org.apache.iotdb.spark.table.db.IoTDBOptions
+import org.apache.spark.sql.connector.write.{Write, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+
+class IoTDBWriteBuilder(options: IoTDBOptions, writeSchema: StructType,
tableSchema: StructType) extends WriteBuilder {
+ override def build(): Write = new IoTDBWrite(options, writeSchema,
tableSchema)
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteFactory.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteFactory.scala
new file mode 100644
index 0000000..8219206
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteFactory.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.write
+
+import org.apache.iotdb.spark.table.db.IoTDBOptions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
+import org.apache.spark.sql.types.StructType
+
+class IoTDBWriteFactory(options: IoTDBOptions, writeSchema: StructType,
tableSchema: StructType) extends DataWriterFactory {
+
+ override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
+ new IoTDBDataWriter(options, writeSchema, tableSchema)
+ }
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/UtilsTest.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/UtilsTest.scala
new file mode 100644
index 0000000..1ab390a
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/UtilsTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db
+
+import org.junit.Assert
+import org.scalatest.FunSuite
+
+
+class UtilsTest extends FunSuite {
+ test("testConvertIdentifier") {
+ var str = IoTDBUtils.getIoTDBColumnIdentifierInSQL("tag1", false)
+ Assert.assertEquals("\"tag1\"", str)
+ str = IoTDBUtils.getIoTDBColumnIdentifierInSQL("`ta``g1`", true)
+ Assert.assertEquals("\"ta`g1\"", str)
+ str = IoTDBUtils.getIoTDBColumnIdentifierInSQL("`ta\"g1`", true)
+ Assert.assertEquals("\"ta\"\"g1\"", str)
+ }
+
+}
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/read/PushDownPredicateSQLBuilderTest.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/read/PushDownPredicateSQLBuilderTest.scala
new file mode 100644
index 0000000..7ebdf11
--- /dev/null
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/read/PushDownPredicateSQLBuilderTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table.db.read
+
+import org.apache.spark.sql.connector.expressions.Expressions
+import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse,
AlwaysTrue, And, Not, Or, Predicate}
+import org.apache.spark.sql.sources.EqualTo
+import org.junit.Assert
+import org.scalatest.FunSuite
+
+import java.sql.Date
+
+class PushDownPredicateSQLBuilderTest extends FunSuite {
+ private val builder = new IoTDBExpressionSQLBuilder
+ test("testBuildIoTDBSQL") {
+ Assert.assertEquals("(\"s1\" IS NULL)", builder.build(new
Predicate("IS_NULL", Array(Expressions.column("s1")))))
+ Assert.assertEquals("(\"s`1\" IS NULL)", builder.build(new
Predicate("IS_NULL", Array(Expressions.column("`s``1`")))))
+ Assert.assertEquals("(\"s\"\"1\" IS NULL)", builder.build(new
Predicate("IS_NULL", Array(Expressions.column("`s\"1`")))))
+
+ Assert.assertEquals("(\"s1\" IS NOT NULL)", builder.build(new
Predicate("IS_NOT_NULL", Array(Expressions.column("s1")))))
+
+ Assert.assertEquals("(ends_with(\"s1\", \"s2\"))", builder.build(new
Predicate("ENDS_WITH", Array(Expressions.column("s1"),
Expressions.column("s2")))))
+ Assert.assertEquals("(ends_with(\"s1\", 'value1'))", builder.build(new
Predicate("ENDS_WITH", Array(Expressions.column("s1"),
Expressions.literal("value1")))))
+ Assert.assertEquals("(ends_with(\"s1\", 'va''lue1'))", builder.build(new
Predicate("ENDS_WITH", Array(Expressions.column("s1"),
Expressions.literal("va'lue1")))))
+
+ Assert.assertEquals("(starts_with(\"s1\", \"s2\"))", builder.build(new
Predicate("STARTS_WITH", Array(Expressions.column("s1"),
Expressions.column("s2")))))
+ Assert.assertEquals("(starts_with(\"s1\", 'value1'))", builder.build(new
Predicate("STARTS_WITH", Array(Expressions.column("s1"),
Expressions.literal("value1")))))
+ Assert.assertEquals("(starts_with(\"s1\", 'va''lue1'))", builder.build(new
Predicate("STARTS_WITH", Array(Expressions.column("s1"),
Expressions.literal("va'lue1")))))
+
+ Assert.assertThrows(classOf[UnsupportedOperationException], () =>
builder.build(new Predicate("CONTAINS", Array(Expressions.column("s1"),
Expressions.column("s2")))))
+ Assert.assertEquals("(\"s1\" LIKE '%value1%')", builder.build(new
Predicate("CONTAINS", Array(Expressions.column("s1"),
Expressions.literal("value1")))))
+ Assert.assertEquals("(\"s1\" LIKE '%va''lue1%')", builder.build(new
Predicate("CONTAINS", Array(Expressions.column("s1"),
Expressions.literal("va'lue1")))))
+
+ Assert.assertEquals("(\"s1\" IN (1,2,3))", builder.build(new
Predicate("IN", Array(Expressions.column("s1"), Expressions.literal(1),
Expressions.literal(2), Expressions.literal(3)))))
+ Assert.assertEquals("(\"s1\" IN ('value1','value2','val''ue3'))",
builder.build(new Predicate("IN", Array(Expressions.column("s1"),
Expressions.literal("value1"), Expressions.literal("value2"),
Expressions.literal("val\'ue3")))))
+
+ Assert.assertEquals("(\"s1\" = 1)", builder.build(new Predicate("=",
Array(Expressions.column("s1"), Expressions.literal(1)))))
+ Assert.assertEquals("(\"s1\" = 1)", builder.build(new Predicate("=",
Array(Expressions.column("s1"), Expressions.literal(1.toShort)))))
+ Assert.assertEquals("(\"s1\" = 1)", builder.build(new Predicate("=",
Array(Expressions.column("s1"), Expressions.literal(1.toByte)))))
+ Assert.assertEquals("(\"s1\" = 'val''ue1')", builder.build(new
Predicate("=", Array(Expressions.column("s1"),
Expressions.literal("val'ue1")))))
+ Assert.assertEquals("(\"s1\" = X'010101')", builder.build(new
Predicate("=", Array(Expressions.column("s1"),
Expressions.literal(Array(1.toByte, 1.toByte, 1.toByte))))))
+ // If you meet error on jdk17, add
'--add-opens=java.base/sun.util.calendar=ALL-UNNAMED' to VM options
+ Assert.assertEquals("(\"s1\" = CAST('2025-01-01' as DATE))",
builder.build(EqualTo("s1", Date.valueOf("2025-01-01")).toV2))
+
+ Assert.assertEquals("(\"s1\" != 1)", builder.build(new Predicate("<>",
Array(Expressions.column("s1"), Expressions.literal(1)))))
+ Assert.assertEquals("(\"s1\" < 1)", builder.build(new Predicate("<",
Array(Expressions.column("s1"), Expressions.literal(1)))))
+ Assert.assertEquals("(\"s1\" <= 1)", builder.build(new Predicate("<=",
Array(Expressions.column("s1"), Expressions.literal(1)))))
+ Assert.assertEquals("(\"s1\" > 1)", builder.build(new Predicate(">",
Array(Expressions.column("s1"), Expressions.literal(1)))))
+ Assert.assertEquals("(\"s1\" >= 1)", builder.build(new Predicate(">=",
Array(Expressions.column("s1"), Expressions.literal(1)))))
+ Assert.assertThrows(classOf[UnsupportedOperationException], () =>
builder.build(new Predicate("<=>", Array(Expressions.column("s1"),
Expressions.literal(1)))))
+
+ Assert.assertEquals("((\"time\" = 1) AND (\"s1\" = 1))", builder.build(new
And(new Predicate("=", Array(Expressions.column("time"),
Expressions.literal(1L))), new Predicate("=", Array(Expressions.column("s1"),
Expressions.literal(1))))))
+ Assert.assertEquals("((\"time\" = 1) OR (\"s1\" = 1))", builder.build(new
Or(new Predicate("=", Array(Expressions.column("time"),
Expressions.literal(1L))), new Predicate("=", Array(Expressions.column("s1"),
Expressions.literal(1))))))
+ Assert.assertEquals("(NOT (\"s1\" = 1))", builder.build(new Not(new
Predicate("=", Array(Expressions.column("s1"), Expressions.literal(1))))))
+ Assert.assertEquals("(true)", builder.build(new AlwaysTrue))
+ Assert.assertEquals("(false)", builder.build(new AlwaysFalse))
+ }
+
+}
diff --git a/examples/pom.xml b/examples/pom.xml
index 2935b50..f743e7b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -37,6 +37,7 @@
<module>pulsar</module>
<module>rabbitmq</module>
<module>rocketmq</module>
+ <module>spark-table</module>
</modules>
<build>
<pluginManagement>
diff --git a/examples/spark-table/README.md b/examples/spark-table/README.md
new file mode 100644
index 0000000..b986b66
--- /dev/null
+++ b/examples/spark-table/README.md
@@ -0,0 +1,103 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+# IoTDB-Table-Spark-Connector Example
+## Introduction
+This example demonstrates how to use the IoTDB-Table-Spark-Connector to read
and write data from/to IoTDB in Spark.
+## Version
+* Scala 2.12
+* Spark 3.3 or later
+## Usage
+Import the IoTDB-Table-Spark-Connector dependency in your project.
+```
+<dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-connector-3.5</artifactId>
+</dependency>
+```
+## Options
+| Key | Default Value | Comment
| Required |
+|----------------|----------------|-----------------------------------------------------------------------------------------------------------|----------|
+| iotdb.database | -- | The database name of Iotdb, which needs to
be a database that already exists in IoTDB | true |
+| iotdb.table | -- | The table name in IoTDB needs to be a
table that already exists in IoTDB | true |
+| iotdb.username | root | the username to access IoTDB
| false |
+| iotdb.password | root | the password to access IoTDB
| false |
+| iotdb.urls | 127.0.0.1:6667 | The url for the client to connect to the
datanode rpc. If there are multiple urls, separate them with ',' | false |
+
+
+## Read
+### DataFrame
+```scala
+val df =
spark.read.format("org.apache.iotdb.spark.table.db.IoTDBTableProvider")
+ .option("iotdb.database", "$YOUR_IOTDB_DATABASE_NAME")
+ .option("iotdb.table", "$YOUR_IOTDB_TABLE_NAME")
+ .option("iotdb.username", "$YOUR_IOTDB_USERNAME")
+ .option("iotdb.password", "$YOUR_IOTDB_PASSWORD")
+ .option("iotdb.url", "$YOUR_IOTDB_URL")
+ .load()
+```
+### Spark SQL
+```
+CREATE TEMPORARY VIEW spark_iotdb
+ USING org.apache.iotdb.spark.table.db.IoTDBTableProvider
+ OPTIONS(
+ "iotdb.database"="$YOUR_IOTDB_DATABASE_NAME",
+ "iotdb.table"="$YOUR_IOTDB_TABLE_NAME",
+ "iotdb.username"="$YOUR_IOTDB_USERNAME",
+ "iotdb.password"="$YOUR_IOTDB_PASSWORD",
+ "iotdb.urls"="$YOUR_IOTDB_URL"
+);
+
+SELECT * FROM spark_iotdb;
+```
+
+## Write
+### DataFrame
+```scala
+val df = spark.createDataFrame(List(
+ (1L, "tag1_value1", "tag2_value1", "attribute1_value1", 1, true),
+ (1L, "tag1_value1", "tag2_value2", "attribute1_value1", 2, false)))
+ .toDF("time", "tag1", "tag2", "attribute1", "s1", "s2")
+
+df
+ .write
+ .format("org.apache.iotdb.spark.table.db.IoTDBTableProvider")
+ .option("iotdb.database", "$YOUR_IOTDB_DATABASE_NAME")
+ .option("iotdb.table", "$YOUR_IOTDB_TABLE_NAME")
+ .option("iotdb.username", "$YOUR_IOTDB_USERNAME")
+ .option("iotdb.password", "$YOUR_IOTDB_PASSWORD")
+ .option("iotdb.urls", "$YOUR_IOTDB_URL")
+ .save()
+```
+### Spark SQL
+```
+CREATE TEMPORARY VIEW spark_iotdb
+ USING org.apache.iotdb.spark.table.db.IoTDBTableProvider
+ OPTIONS(
+ "iotdb.database"="$YOUR_IOTDB_DATABASE_NAME",
+ "iotdb.table"="$YOUR_IOTDB_TABLE_NAME",
+ "iotdb.username"="$YOUR_IOTDB_USERNAME",
+ "iotdb.password"="$YOUR_IOTDB_PASSWORD",
+ "iotdb.urls"="$YOUR_IOTDB_URL"
+);
+
+INSERT INTO spark_iotdb VALUES ("VALUE1", "VALUE2", ...);
+INSERT INTO spark_iotdb SELECT * FROM YOUR_TABLE
+```
\ No newline at end of file
diff --git a/connectors/pom.xml b/examples/spark-table/pom.xml
similarity index 50%
copy from connectors/pom.xml
copy to examples/spark-table/pom.xml
index ad2c299..762f318 100644
--- a/connectors/pom.xml
+++ b/examples/spark-table/pom.xml
@@ -23,44 +23,50 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-extras-parent</artifactId>
+ <artifactId>examples</artifactId>
<version>2.0.2-SNAPSHOT</version>
</parent>
- <artifactId>connectors</artifactId>
- <packaging>pom</packaging>
- <name>IoTDB Extras: Connectors</name>
- <modules>
- <module>flink-iotdb-connector</module>
- <module>flink-sql-iotdb-connector</module>
- <module>flink-tsfile-connector</module>
- <module>grafana-connector</module>
- <module>hadoop</module>
- <module>hive-connector</module>
- <module>spark-iotdb-connector</module>
- <module>spark-tsfile</module>
- <module>zeppelin-interpreter</module>
- </modules>
+ <artifactId>table-spark-connector-example</artifactId>
+ <name>IoTDB: Example: IoTDB Table Spark Connector</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>spark-iotdb-table-connector-3.5</artifactId>
+ <version>2.0.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>3.5.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ <version>2.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
<build>
<plugins>
<plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <version>0.15</version>
- <inherited>false</inherited>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
<configuration>
- <excludes combine.children="append">
- <exclude>**/grafana-plugin/**</exclude>
- </excludes>
+ <scalaVersion>${scala.version}</scalaVersion>
</configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
</plugins>
</build>
- <profiles>
- <profile>
- <id>with-grafana-plugin</id>
- <modules>
- <module>grafana-plugin</module>
- </modules>
- </profile>
- </profiles>
</project>
diff --git
a/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorReadExample.scala
b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorReadExample.scala
new file mode 100644
index 0000000..89a9f5f
--- /dev/null
+++
b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorReadExample.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table
+
+import org.apache.spark.sql.SparkSession
+
+object SparkConnectorReadExample {
+
+ def main(args: Array[String]): Unit = {
+ val spark = SparkSession.builder()
+ .appName("IoTDB Spark Demo")
+ .config("spark.sql.shuffle.partitions", "1")
+ .config("spark.master", "local[*]")
+ .getOrCreate()
+ val df =
spark.read.format("org.apache.iotdb.spark.table.db.IoTDBTableProvider")
+ .option("iotdb.database", "test")
+ .option("iotdb.table", "table1")
+ .option("iotdb.username", "root")
+ .option("iotdb.password", "root")
+ .option("iotdb.urls", "127.0.0.1:6667")
+ .load()
+ df.createTempView("iotdb_table1")
+ df.printSchema()
+ spark.sql("select * from iotdb_table1").show()
+ spark.close()
+ }
+}
diff --git
a/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorSQLExample.scala
b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorSQLExample.scala
new file mode 100644
index 0000000..fc9249f
--- /dev/null
+++
b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorSQLExample.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table
+
+import org.apache.spark.sql.SparkSession
+
+object SparkConnectorSQLExample {
+ def main(args: Array[String]): Unit = {
+ val spark = SparkSession.builder()
+ .appName("IoTDB Spark Demo")
+ .config("spark.sql.shuffle.partitions", "1")
+ .config("spark.master", "local[*]")
+ .getOrCreate()
+ spark.sql(
+ """
+ CREATE TEMPORARY VIEW spark_iotdb1
+ USING org.apache.iotdb.spark.table.db.IoTDBTableProvider
+ OPTIONS(
+ "iotdb.database"="test",
+ "iotdb.table"="table1",
+ "iotdb.username"="root",
+ "iotdb.password"="root",
+ "iotdb.url"="127.0.0.1:6667");
+ """)
+ spark.sql(
+ """
+ CREATE TEMPORARY VIEW spark_iotdb2
+ USING org.apache.iotdb.spark.table.db.IoTDBTableProvider
+ OPTIONS(
+ "iotdb.database"="test",
+ "iotdb.table"="table2",
+ "iotdb.username"="root",
+ "iotdb.password"="root",
+ "iotdb.urls"="127.0.0.1:6667");
+ """)
+ spark.sql("select * from spark_iotdb1").show
+ spark.sql("insert into spark_iotdb2 select time,tag1, s0, s1 from
spark_iotdb1")
+ spark.sql("select * from spark_iotdb1").show
+ spark.close()
+
+ }
+}
diff --git
a/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorWriteExample.scala
b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorWriteExample.scala
new file mode 100644
index 0000000..7e3deb6
--- /dev/null
+++
b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorWriteExample.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.table
+
+import org.apache.spark.sql.SparkSession
+
+object SparkConnectorWriteExample {
+ def main(args: Array[String]): Unit = {
+ val spark = SparkSession.builder()
+ .appName("IoTDB Spark Demo")
+ .config("spark.sql.shuffle.partitions", "1")
+ .config("spark.master", "local[*]")
+ .getOrCreate()
+ // time, tag1 string tag,tag2 string tag, s0 int32, s1 boolean
+ val df = spark.createDataFrame(List(
+ (1L, "tag1_value1","tag2_value1", 1, false),
+ (1L, "tag1_value1","tag2_value1", 1, true),
+ (2L, "tag1_value2","tag2_value1")), 2, true)
+ .toDF("time", "tag1", "tag2", "s0", "s1")
+
+
+ df
+ .write
+ .format("org.apache.iotdb.spark.table.db.IoTDBTableProvider")
+ .option("iotdb.database", "test")
+ .option("iotdb.table", "spark_table1")
+ .option("iotdb.username", "root")
+ .option("iotdb.password", "root")
+ .option("iotdb.urls", "127.0.0.1:6667")
+ .mode("append")
+ .save()
+ spark.close()
+ }
+}
diff --git a/iotdb-collector/collector-core/pom.xml
b/iotdb-collector/collector-core/pom.xml
index e8e6b5b..7b68e68 100644
--- a/iotdb-collector/collector-core/pom.xml
+++ b/iotdb-collector/collector-core/pom.xml
@@ -103,6 +103,10 @@
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/pom.xml b/pom.xml
index f3e32df..3620804 100644
--- a/pom.xml
+++ b/pom.xml
@@ -693,6 +693,16 @@
<artifactId>scalatest_2.11</artifactId>
<version>${scalatest.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.12</artifactId>
+ <version>${scalatest.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scalactic</groupId>
+ <artifactId>scalactic_2.12</artifactId>
+ <version>3.0.9</version>
+ </dependency>
<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_2.11</artifactId>