This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 45a6658 [IoTDB-1989] IoTDB support insert data from Spark (#4477)
45a6658 is described below
commit 45a665866ed830456d0d44ece0c02c37166e22fe
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Mon Dec 6 11:30:43 2021 +0800
[IoTDB-1989] IoTDB support insert data from Spark (#4477)
1. fix the bug of count statement.
2. update the ITs of spark connector.
---
.../UserGuide/Ecosystem Integration/Spark IoTDB.md | 53 ++++++-
.../UserGuide/Ecosystem Integration/Spark IoTDB.md | 58 +++++++-
spark-iotdb-connector/pom.xml | 36 +----
.../org/apache/iotdb/spark/db/Converter.scala | 6 +-
.../org/apache/iotdb/spark/db/DefaultSource.scala | 30 +++-
.../org/apache/iotdb/spark/db/IoTDBOptions.scala | 2 +-
.../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala | 7 +-
.../iotdb/spark/db/tools/DataFrameTools.java | 162 +++++++++++++++++++++
.../org/apache/iotdb/spark/db/IoTDBTest.scala | 32 ++--
.../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 117 +++++++++++++++
10 files changed, 435 insertions(+), 68 deletions(-)
diff --git a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
index a911801..36388b8 100644
--- a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
+++ b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
@@ -18,7 +18,7 @@
under the License.
-->
-## Spark-IoTDB
+# Spark-IoTDB
### version
The versions required for Spark and Java are as follow:
@@ -45,8 +45,9 @@ mvn clean scala:compile compile install
</dependency>
```
+## Read Data from IoTDB
-#### spark-shell user guide
+### spark-shell user guide
```
spark-shell --jars
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
@@ -76,7 +77,7 @@ df.printSchema()
df.show()
```
-#### Schema Inference
+### Schema Inference
Take the following TsFile structure as an example: There are three
Measurements in the TsFile schema: status, temperature, and hardware. The basic
information of these three measurements is as follows:
@@ -114,7 +115,7 @@ You can also use narrow table form which as follows: (You
can see part 4 about h
| 5 | root.ln.wf02.wt01 | false | null
| null |
| 6 | root.ln.wf02.wt02 | null | ccc
| null |
-#### Get narrow form of data
+### Get narrow form of data
```
spark-shell --jars
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
@@ -127,7 +128,7 @@ df.printSchema()
df.show()
```
-#### Java user guide
+### Java user guide
```
import org.apache.spark.sql.Dataset;
@@ -158,3 +159,45 @@ public class Example {
}
```
+## Write Data to IoTDB
+
+### User Guide
+``` scala
+// import narrow table
+val df = spark.createDataFrame(List(
+ (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
+ (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+ .withColumnRenamed("_2", "device_name")
+ .withColumnRenamed("_3", "s0")
+ .withColumnRenamed("_4", "s1")
+ .withColumnRenamed("_5", "s2")
+ .withColumnRenamed("_6", "s3")
+ .withColumnRenamed("_7", "s4")
+ .withColumnRenamed("_8", "s5")
+dfWithColumn
+ .write
+ .format("org.apache.iotdb.spark.db")
+ .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .save
+
+// import wide table
+val df = spark.createDataFrame(List(
+ (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
+ (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+ .withColumnRenamed("_2", "root.test.d0.s0")
+ .withColumnRenamed("_3", "root.test.d0.s1")
+ .withColumnRenamed("_4", "root.test.d0.s2")
+ .withColumnRenamed("_5", "root.test.d0.s3")
+ .withColumnRenamed("_6", "root.test.d0.s4")
+ .withColumnRenamed("_7", "root.test.d0.s5")
+dfWithColumn.write.format("org.apache.iotdb.spark.db")
+ .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .save
+```
+
+### Notes
+1. You can directly write data to IoTDB whatever the dataframe contains a wide
table or a narrow table.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
index f28d82a..21b0cda 100644
--- a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
+++ b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
@@ -19,9 +19,9 @@
-->
-## Spark-IoTDB
+# Spark-IoTDB
-### 版本
+## 版本
Spark 和 Java 所需的版本如下:
@@ -29,7 +29,7 @@ Spark 和 Java 所需的版本如下:
| ------------- | ------------- | ------------ | -------- |
| `2.4.3` | `2.11` | `1.8` | `0.13.0-SNAPSHOT` |
-### 安装
+## 安装
mvn clean scala:compile compile install
@@ -43,7 +43,9 @@ mvn clean scala:compile compile install
</dependency>
```
-#### Spark-shell 用户指南
+## 从IoTDB读取数据
+
+### Spark-shell 用户指南
```
spark-shell --jars
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
@@ -73,7 +75,7 @@ df.printSchema()
df.show()
```
-#### 模式推断
+### 模式推断
以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:
@@ -118,7 +120,7 @@ time|d1.status|time|d1.temperature |time | d2.hardware
|time|d2.status
| 5 | root.ln.wf02.wt01 | false | null | null |
| 6 | root.ln.wf02.wt02 | null | ccc | null |
-#### 获取窄表格式的数据
+### 获取窄表格式的数据
```
spark-shell --jars
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
@@ -131,7 +133,7 @@ df.printSchema()
df.show()
```
-#### Java 用户指南
+### Java 用户指南
```
import org.apache.spark.sql.Dataset;
@@ -161,3 +163,45 @@ public class Example {
}
}
```
+
+## 写数据到IoTDB
+### 用户指南
+``` scala
+// import narrow table
+val df = spark.createDataFrame(List(
+ (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
+ (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+ .withColumnRenamed("_2", "device_name")
+ .withColumnRenamed("_3", "s0")
+ .withColumnRenamed("_4", "s1")
+ .withColumnRenamed("_5", "s2")
+ .withColumnRenamed("_6", "s3")
+ .withColumnRenamed("_7", "s4")
+ .withColumnRenamed("_8", "s5")
+dfWithColumn
+ .write
+ .format("org.apache.iotdb.spark.db")
+ .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .save
+
+// import wide table
+val df = spark.createDataFrame(List(
+ (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
+ (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+ .withColumnRenamed("_2", "root.test.d0.s0")
+ .withColumnRenamed("_3", "root.test.d0.s1")
+ .withColumnRenamed("_4", "root.test.d0.s2")
+ .withColumnRenamed("_5", "root.test.d0.s3")
+ .withColumnRenamed("_6", "root.test.d0.s4")
+ .withColumnRenamed("_7", "root.test.d0.s5")
+dfWithColumn.write.format("org.apache.iotdb.spark.db")
+ .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .save
+```
+
+### 注意
+1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
\ No newline at end of file
diff --git a/spark-iotdb-connector/pom.xml b/spark-iotdb-connector/pom.xml
index f10ec0f..c37cb7b 100644
--- a/spark-iotdb-connector/pom.xml
+++ b/spark-iotdb-connector/pom.xml
@@ -32,6 +32,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compile.version>1.8</compile.version>
+ <scala.library.version>2.11</scala.library.version>
+ <scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
@@ -52,35 +54,10 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <!--commons-configuration uses commons-lang:commons-lang:2.4
- while others use commons-lang 2.6-->
- <exclusion>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- this is just used by hadoop-common-->
- <dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- <version>1.6</version>
- <exclusions>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${project.version}</version>
</dependency>
- <!-- many of hadoop dependencies use guava11, but org.apache.curator
from hadoop-common uses
- guava16 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -97,16 +74,19 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
diff --git
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
index da3435d..023e4f8 100644
---
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
+++
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
@@ -65,7 +65,11 @@ object Converter {
}
val colCount = resultSetMetaData.getColumnCount
- for (i <- 2 to colCount) {
+ var startIndex = 2
+ if (!"Time".equals(resultSetMetaData.getColumnName(1))) {
+ startIndex = 1
+ }
+ for (i <- startIndex to colCount) {
fields += StructField(resultSetMetaData.getColumnLabel(i),
resultSetMetaData.getColumnType(i) match {
case Types.BOOLEAN => BooleanType
case Types.INTEGER => IntegerType
diff --git
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
index 60ec74a..9812b14 100644
---
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
+++
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
@@ -19,11 +19,14 @@
package org.apache.iotdb.spark.db
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister,
RelationProvider}
+
+import org.apache.iotdb.spark.db.tools.DataFrameTools
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider,
DataSourceRegister, RelationProvider}
import org.slf4j.LoggerFactory
-private[iotdb] class DefaultSource extends RelationProvider with
DataSourceRegister {
+
+private[iotdb] class DefaultSource extends RelationProvider with
DataSourceRegister with CreatableRelationProvider {
private final val logger = LoggerFactory.getLogger(classOf[DefaultSource])
override def shortName(): String = "tsfile"
@@ -34,10 +37,27 @@ private[iotdb] class DefaultSource extends RelationProvider
with DataSourceRegis
val iotdbOptions = new IoTDBOptions(parameters)
- if (iotdbOptions.url == null || iotdbOptions.sql == null) {
- sys.error("IoTDB url or sql not specified")
+ if ("".equals(iotdbOptions.sql)) {
+ sys.error("sql not specified")
}
+
new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession)
+ }
+
+ override def createRelation(sqlContext: SQLContext, mode: SaveMode,
parameters: Map[String, String], data: DataFrame): BaseRelation = {
+ if (!data.columns.contains("Time")) {
+ sys.error("No `Time` column")
+ }
+ val iotdbOptions = new IoTDBOptions(parameters)
+ if (!data.columns.contains("device_name")) {
+ data.columns.foreach(column => if (!column.startsWith("root.") && column
!= "Time") sys.error("Invalidate column: " + column))
+ val narrowDf = Transformer.toNarrowForm(sqlContext.sparkSession, data)
+ DataFrameTools.insertDataFrame(iotdbOptions, narrowDf)
+ } else {
+ DataFrameTools.insertDataFrame(iotdbOptions, data)
+ }
+
+ new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession)
}
}
\ No newline at end of file
diff --git
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
index c4aaa3d..62bb59e 100644
---
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
+++
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
@@ -29,7 +29,7 @@ class IoTDBOptions(
val password = parameters.getOrElse("password", "root")
- val sql = parameters.getOrElse("sql", sys.error("Option 'sql' not
specified"))
+ val sql = parameters.getOrElse("sql", "")
val numPartition = parameters.getOrElse("numPartition", "1")
diff --git
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
index c5d2cb3..7c811e4 100644
---
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
+++
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
@@ -60,11 +60,10 @@ class IoTDBRDD private[iotdb](
val part = split.asInstanceOf[IoTDBPartition]
var taskInfo: String = _
- Option(TaskContext.get()).foreach { taskContext => {
+ Option(TaskContext.get()).foreach { taskContext =>
taskContext.addTaskCompletionListener { _ => conn.close() }
taskInfo = "task Id: " + taskContext.taskAttemptId() + " partition Id: "
+ taskContext.partitionId()
}
- }
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver")
val conn: Connection = DriverManager.getConnection(options.url,
options.user, options.password)
@@ -112,8 +111,8 @@ class IoTDBRDD private[iotdb](
override def hasNext: Boolean = {
if (!finished && !gotNext) {
- nextValue = getNext
- gotNext = true
+ nextValue = getNext
+ gotNext = true
}
!finished
}
diff --git
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java
new file mode 100644
index 0000000..aee647d
--- /dev/null
+++
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.db.tools;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.spark.db.IoTDBOptions;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.thrift.annotation.Nullable;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.FLOAT;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT64;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.TEXT;
+
+public class DataFrameTools {
+ /**
+ * * insert a narrow dataframe into IoTDB
+ *
+ * @param options the options to create a IoTDB session
+ * @param dataFrame a dataframe of narrow table
+ * @return
+ */
+ public static void insertDataFrame(IoTDBOptions options, Dataset<Row>
dataFrame) {
+ List<Tuple2<String, String>> sensorTypes = new
ArrayList<>(Arrays.asList(dataFrame.dtypes()));
+ sensorTypes.remove(0);
+ sensorTypes.remove(0);
+
+ List<Row> devices =
dataFrame.select("device_name").distinct().collectAsList();
+
+ Dataset<Row> repartition =
dataFrame.repartition(dataFrame.col("device_name"));
+
+ for (Row row : devices) {
+ String device = row.get(0).toString();
+ repartition
+ .where(String.format("device_name == '%s'", device))
+ .foreachPartition(
+ partition -> {
+ String[] hostPort = options.url().split("//")[1].replace("/",
"").split(":");
+ Session session =
+ new Session.Builder()
+ .host(hostPort[0])
+ .port(Integer.valueOf(hostPort[1]))
+ .username(String.valueOf(options.user()))
+ .password(String.valueOf(options.password()))
+ .build();
+ session.open();
+ partition.forEachRemaining(
+ record -> {
+ ArrayList<String> measurements = new ArrayList<>();
+ ArrayList<TSDataType> types = new ArrayList<>();
+ ArrayList<Object> values = new ArrayList<>();
+ for (int i = 2; i < record.length(); i++) {
+ Object value = record.get(i);
+ if (value == null) {
+ continue;
+ }
+ value =
+ typeTrans(record.get(i).toString(),
getType(sensorTypes.get(i - 2)._2));
+
+ values.add(value);
+ measurements.add(sensorTypes.get(i - 2)._1);
+ types.add(getType(sensorTypes.get(i - 2)._2));
+ }
+ try {
+ session.insertRecord(
+ record.get(1).toString(),
+ (Long) record.get(0),
+ measurements,
+ types,
+ values);
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ } catch (StatementExecutionException e) {
+ e.printStackTrace();
+ }
+ });
+ session.close();
+ });
+ }
+ }
+ /**
+ * @param value
+ * @param type
+ * @return
+ */
+ private static Object typeTrans(String value, TSDataType type) {
+ try {
+ switch (type) {
+ case TEXT:
+ return value;
+ case BOOLEAN:
+ return Boolean.valueOf(value);
+ case INT32:
+ return Integer.valueOf(value);
+ case INT64:
+ return Long.valueOf(value);
+ case FLOAT:
+ return Float.valueOf(value);
+ case DOUBLE:
+ return Double.valueOf(value);
+ default:
+ return null;
+ }
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ /**
+ * return the TSDataType
+ *
+ * @param typeStr
+ * @return
+ */
+ private static TSDataType getType(String typeStr) {
+ switch (typeStr) {
+ case "StringType":
+ return TEXT;
+ case "BooleanType":
+ return BOOLEAN;
+ case "IntegerType":
+ return INT32;
+ case "LongType":
+ return INT64;
+ case "FloatType":
+ return FLOAT;
+ case "DoubleType":
+ return DOUBLE;
+ default:
+ return null;
+ }
+ }
+}
diff --git
a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
index 2e84ac9..26ba987 100644
---
a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
+++
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
@@ -20,7 +20,6 @@
package org.apache.iotdb.spark.db
import java.io.ByteArrayOutputStream
-
import org.apache.iotdb.db.conf.IoTDBConstant
import org.apache.iotdb.db.service.IoTDB
import org.apache.iotdb.jdbc.Config
@@ -70,32 +69,32 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
}
test("test show data") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
- .option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select *
from root.**").load
+ val df = spark.read.format("org.apache.iotdb.spark.db")
+ .option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select **
from root").load
Assert.assertEquals(7505, df.count())
}
test("test show data with partition") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
+ val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .option("sql", "select * from root.**")
+ .option("sql", "select ** from root")
.option("lowerBound", 1).option("upperBound", System.nanoTime() / 1000 /
1000)
.option("numPartition", 10).load
Assert.assertEquals(7505, df.count())
}
test("test filter data") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
+ val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .option("sql", "select * from root.** where time < 2000 and time >
1000").load
+ .option("sql", "select ** from root where time < 2000 and time >
1000").load
Assert.assertEquals(499, df.count())
}
test("test filter data with partition") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
+ val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .option("sql", "select * from root.** where time < 2000 and time > 1000")
+ .option("sql", "select ** from root where time < 2000 and time > 1000")
.option("lowerBound", 1)
.option("upperBound", 10000).option("numPartition", 10).load
@@ -103,17 +102,17 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
}
test("test transform to narrow") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
+ val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .option("sql", "select * from root.** where time < 1100 and time >
1000").load
+ .option("sql", "select ** from root where time < 1100 and time >
1000").load
val narrow_df = Transformer.toNarrowForm(spark, df)
Assert.assertEquals(198, narrow_df.count())
}
test("test transform to narrow with partition") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
+ val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .option("sql", "select * from root.** where time < 1100 and time > 1000")
+ .option("sql", "select ** from root where time < 1100 and time > 1000")
.option("lowerBound", 1).option("upperBound", 10000)
.option("numPartition", 10).load
val narrow_df = Transformer.toNarrowForm(spark, df)
@@ -121,16 +120,16 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
}
test("test transform back to wide") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
+ val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .option("sql", "select * from root.** where time < 1100 and time >
1000").load
+ .option("sql", "select ** from root where time < 1100 and time >
1000").load
val narrow_df = Transformer.toNarrowForm(spark, df)
val wide_df = Transformer.toWideForm(spark, narrow_df)
Assert.assertEquals(99, wide_df.count())
}
test("test aggregate sql") {
- val df = spark.read.format("org.apache.iotdb.sparkdb")
+ val df = spark.read.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("sql", "select count(d0.s0),count(d0.s1) from root.vehicle").load
@@ -139,7 +138,6 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
df.show(df.count.toInt, false)
}
val actual = outCapture.toByteArray.map(_.toChar)
-
val expect =
"+-------------------------+-------------------------+\n" +
"|count(root.vehicle.d0.s0)|count(root.vehicle.d0.s1)|\n" +
diff --git
a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
new file mode 100644
index 0000000..0c5928d
--- /dev/null
+++
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.db
+
+import org.apache.iotdb.db.conf.IoTDBConstant
+import org.apache.iotdb.db.service.IoTDB
+import org.apache.iotdb.jdbc.Config
+import org.apache.iotdb.session.Session
+import org.apache.spark.sql.SparkSession
+import org.junit.{AfterClass, Before}
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
+ private var daemon: IoTDB = _
+ private var spark: SparkSession = _
+ private var session: Session = _
+
+ @Before
+ override protected def beforeAll(): Unit = {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/")
+ super.beforeAll()
+
+ EnvironmentUtils.closeStatMonitor()
+ daemon = IoTDB.getInstance
+ daemon.active()
+ EnvironmentUtils.envSetUp()
+ Class.forName(Config.JDBC_DRIVER_NAME)
+
+ spark = SparkSession
+ .builder()
+ .config("spark.master", "local")
+ .appName("TSFile test")
+ .getOrCreate()
+
+ session = new Session("127.0.0.1", 6667, "root", "root")
+ session.open()
+ }
+
+ @AfterClass
+ override protected def afterAll(): Unit = {
+ if (spark != null) {
+ spark.sparkContext.stop()
+ }
+
+ daemon.stop()
+ EnvironmentUtils.cleanEnv()
+
+ session.close()
+ super.afterAll()
+ }
+
+ test("test insert wide data") {
+ val df = spark.createDataFrame(List(
+ (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
+ (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
+
+ val dfWithColumn = df.withColumnRenamed("_1", "Time")
+ .withColumnRenamed("_2", "root.test.d0.int")
+ .withColumnRenamed("_3", "root.test.d0.long")
+ .withColumnRenamed("_4", "root.test.d0.float")
+ .withColumnRenamed("_5", "root.test.d0.double")
+ .withColumnRenamed("_6", "root.test.d0.boolean")
+ .withColumnRenamed("_7", "root.test.d0.text")
+ dfWithColumn.write.format("org.apache.iotdb.spark.db")
+ .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .save
+
+ val result = session.executeQueryStatement("select ** from root")
+ var size = 0
+ while (result.hasNext) {
+ size += 1
+ }
+ assertResult(2)(size)
+ }
+
+ test("test insert narrow data") {
+ val df = spark.createDataFrame(List(
+ (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
+ (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
+
+ val dfWithColumn = df.withColumnRenamed("_1", "Time")
+ .withColumnRenamed("_2", "device_name")
+ .withColumnRenamed("_3", "int")
+ .withColumnRenamed("_4", "long")
+ .withColumnRenamed("_5", "float")
+ .withColumnRenamed("_6", "double")
+ .withColumnRenamed("_7", "boolean")
+ .withColumnRenamed("_8", "text")
+ dfWithColumn.write.format("org.apache.iotdb.spark.db")
+ .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .save
+
+ val result = session.executeQueryStatement("select ** from root")
+ var size = 0
+ while (result.hasNext) {
+ size += 1
+ }
+ assertResult(2)(size)
+ }
+}