This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 45a6658  [IoTDB-1989] IoTDB support insert data from Spark (#4477)
45a6658 is described below

commit 45a665866ed830456d0d44ece0c02c37166e22fe
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Mon Dec 6 11:30:43 2021 +0800

    [IoTDB-1989] IoTDB support insert data from Spark (#4477)
    
    1. fix the bug of count statement.
    2. update the ITs of spark connector.
---
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  53 ++++++-
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  58 +++++++-
 spark-iotdb-connector/pom.xml                      |  36 +----
 .../org/apache/iotdb/spark/db/Converter.scala      |   6 +-
 .../org/apache/iotdb/spark/db/DefaultSource.scala  |  30 +++-
 .../org/apache/iotdb/spark/db/IoTDBOptions.scala   |   2 +-
 .../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala |   7 +-
 .../iotdb/spark/db/tools/DataFrameTools.java       | 162 +++++++++++++++++++++
 .../org/apache/iotdb/spark/db/IoTDBTest.scala      |  32 ++--
 .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 117 +++++++++++++++
 10 files changed, 435 insertions(+), 68 deletions(-)

diff --git a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md 
b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
index a911801..36388b8 100644
--- a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md       
+++ b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md       
@@ -18,7 +18,7 @@
     under the License.
 
 -->
-## Spark-IoTDB
+# Spark-IoTDB
 ### version
 
 The versions required for Spark and Java are as follow:
@@ -45,8 +45,9 @@ mvn clean scala:compile compile install
     </dependency>
 ```
 
+## Read Data from IoTDB
 
-#### spark-shell user guide
+### spark-shell user guide
 
 ```
 spark-shell --jars 
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
@@ -76,7 +77,7 @@ df.printSchema()
 df.show()
 ```
 
-#### Schema Inference
+### Schema Inference
 
 Take the following TsFile structure as an example: There are three 
Measurements in the TsFile schema: status, temperature, and hardware. The basic 
information of these three measurements is as follows:
 
@@ -114,7 +115,7 @@ You can also use narrow table form which as follows: (You 
can see part 4 about h
 |    5 | root.ln.wf02.wt01             | false                    | null       
                | null                          |
 |    6 | root.ln.wf02.wt02             | null                     | ccc        
                | null                          |
 
-#### Get narrow form of data
+### Get narrow form of data
 ```
 spark-shell --jars 
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
 
@@ -127,7 +128,7 @@ df.printSchema()
 df.show()
 ```
 
-#### Java user guide
+### Java user guide
 
 ```
 import org.apache.spark.sql.Dataset;
@@ -158,3 +159,45 @@ public class Example {
 }
 ```
 
+## Write Data to IoTDB
+
+### User Guide
+``` scala
+// import narrow table
+val df = spark.createDataFrame(List(
+      (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
+      (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+    .withColumnRenamed("_2", "device_name")
+    .withColumnRenamed("_3", "s0")
+    .withColumnRenamed("_4", "s1")
+    .withColumnRenamed("_5", "s2")
+    .withColumnRenamed("_6", "s3")
+    .withColumnRenamed("_7", "s4")
+    .withColumnRenamed("_8", "s5")
+dfWithColumn
+    .write
+    .format("org.apache.iotdb.spark.db")
+    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+    .save
+    
+// import wide table
+val df = spark.createDataFrame(List(
+      (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
+      (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+    .withColumnRenamed("_2", "root.test.d0.s0")
+    .withColumnRenamed("_3", "root.test.d0.s1")
+    .withColumnRenamed("_4", "root.test.d0.s2")
+    .withColumnRenamed("_5", "root.test.d0.s3")
+    .withColumnRenamed("_6", "root.test.d0.s4")
+    .withColumnRenamed("_7", "root.test.d0.s5")
+dfWithColumn.write.format("org.apache.iotdb.spark.db")
+    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+    .save
+```
+
+### Notes
+1. You can directly write data to IoTDB whatever the dataframe contains a wide 
table or a narrow table.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md 
b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
index f28d82a..21b0cda 100644
--- a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md    
+++ b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md    
@@ -19,9 +19,9 @@
 
 -->
 
-## Spark-IoTDB
+# Spark-IoTDB
 
-### 版本
+## 版本
 
 Spark 和 Java 所需的版本如下:
 
@@ -29,7 +29,7 @@ Spark 和 Java 所需的版本如下:
 | ------------- | ------------- | ------------ | -------- |
 | `2.4.3`       | `2.11`        | `1.8`        | `0.13.0-SNAPSHOT` |
 
-### 安装
+## 安装
 
 mvn clean scala:compile compile install
 
@@ -43,7 +43,9 @@ mvn clean scala:compile compile install
     </dependency>
 ```
 
-#### Spark-shell 用户指南
+## 从IoTDB读取数据
+
+### Spark-shell 用户指南
 
 ```
 spark-shell --jars 
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
@@ -73,7 +75,7 @@ df.printSchema()
 df.show()
 ```
 
-#### 模式推断
+### 模式推断
 
 以下 TsFile 结构为例:TsFile 模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:
 
@@ -118,7 +120,7 @@ time|d1.status|time|d1.temperature |time    | d2.hardware   
|time|d2.status
 | 5    | root.ln.wf02.wt01 | false | null | null |
 | 6    | root.ln.wf02.wt02 | null  | ccc  | null |
 
-#### 获取窄表格式的数据
+### 获取窄表格式的数据
 ```
 spark-shell --jars 
spark-iotdb-connector-0.13.0-SNAPSHOT.jar,iotdb-jdbc-0.13.0-SNAPSHOT-jar-with-dependencies.jar
 
@@ -131,7 +133,7 @@ df.printSchema()
 df.show()
 ```
 
-#### Java 用户指南
+### Java 用户指南
 
 ```
 import org.apache.spark.sql.Dataset;
@@ -161,3 +163,45 @@ public class Example {
   }
 }
 ```
+
+## 写数据到IoTDB
+### 用户指南
+``` scala
+// import narrow table
+val df = spark.createDataFrame(List(
+      (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
+      (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+    .withColumnRenamed("_2", "device_name")
+    .withColumnRenamed("_3", "s0")
+    .withColumnRenamed("_4", "s1")
+    .withColumnRenamed("_5", "s2")
+    .withColumnRenamed("_6", "s3")
+    .withColumnRenamed("_7", "s4")
+    .withColumnRenamed("_8", "s5")
+dfWithColumn
+    .write
+    .format("org.apache.iotdb.spark.db")
+    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+    .save
+    
+// import wide table
+val df = spark.createDataFrame(List(
+      (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
+      (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
+
+val dfWithColumn = df.withColumnRenamed("_1", "Time")
+    .withColumnRenamed("_2", "root.test.d0.s0")
+    .withColumnRenamed("_3", "root.test.d0.s1")
+    .withColumnRenamed("_4", "root.test.d0.s2")
+    .withColumnRenamed("_5", "root.test.d0.s3")
+    .withColumnRenamed("_6", "root.test.d0.s4")
+    .withColumnRenamed("_7", "root.test.d0.s5")
+dfWithColumn.write.format("org.apache.iotdb.spark.db")
+    .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+    .save
+```
+
+### 注意
+1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
\ No newline at end of file
diff --git a/spark-iotdb-connector/pom.xml b/spark-iotdb-connector/pom.xml
index f10ec0f..c37cb7b 100644
--- a/spark-iotdb-connector/pom.xml
+++ b/spark-iotdb-connector/pom.xml
@@ -32,6 +32,8 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <compile.version>1.8</compile.version>
+        <scala.library.version>2.11</scala.library.version>
+        <scala.version>2.11.12</scala.version>
     </properties>
     <dependencies>
         <dependency>
@@ -52,35 +54,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-                <!--commons-configuration uses  commons-lang:commons-lang:2.4
-                while others use commons-lang 2.6-->
-                <exclusion>
-                    <groupId>commons-configuration</groupId>
-                    <artifactId>commons-configuration</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <!-- this is just used by hadoop-common-->
-        <dependency>
-            <groupId>commons-configuration</groupId>
-            <artifactId>commons-configuration</artifactId>
-            <version>1.6</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-lang</groupId>
-                    <artifactId>commons-lang</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${project.version}</version>
         </dependency>
-        <!-- many of hadoop dependencies use guava11, but org.apache.curator 
from hadoop-common uses
-        guava16 -->
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
@@ -97,16 +74,19 @@
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.11</artifactId>
+            <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.11</artifactId>
+            <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
         </dependency>
         <dependency>
             <groupId>org.scalatest</groupId>
diff --git 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
index da3435d..023e4f8 100644
--- 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
+++ 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Converter.scala
@@ -65,7 +65,11 @@ object Converter {
       }
 
       val colCount = resultSetMetaData.getColumnCount
-      for (i <- 2 to colCount) {
+      var startIndex = 2
+      if (!"Time".equals(resultSetMetaData.getColumnName(1))) {
+        startIndex = 1
+      }
+      for (i <- startIndex to colCount) {
         fields += StructField(resultSetMetaData.getColumnLabel(i), 
resultSetMetaData.getColumnType(i) match {
           case Types.BOOLEAN => BooleanType
           case Types.INTEGER => IntegerType
diff --git 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
index 60ec74a..9812b14 100644
--- 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
+++ 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
@@ -19,11 +19,14 @@
 
 package org.apache.iotdb.spark.db
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+
+import org.apache.iotdb.spark.db.tools.DataFrameTools
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister, RelationProvider}
 import org.slf4j.LoggerFactory
 
-private[iotdb] class DefaultSource extends RelationProvider with 
DataSourceRegister {
+
+private[iotdb] class DefaultSource extends RelationProvider with 
DataSourceRegister with CreatableRelationProvider {
   private final val logger = LoggerFactory.getLogger(classOf[DefaultSource])
 
   override def shortName(): String = "tsfile"
@@ -34,10 +37,27 @@ private[iotdb] class DefaultSource extends RelationProvider 
with DataSourceRegis
 
     val iotdbOptions = new IoTDBOptions(parameters)
 
-    if (iotdbOptions.url == null || iotdbOptions.sql == null) {
-      sys.error("IoTDB url or sql not specified")
+    if ("".equals(iotdbOptions.sql)) {
+      sys.error("sql not specified")
     }
+
     new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession)
+  }
+
+  override def createRelation(sqlContext: SQLContext, mode: SaveMode, 
parameters: Map[String, String], data: DataFrame): BaseRelation = {
+    if (!data.columns.contains("Time")) {
+      sys.error("No `Time` column")
+    }
+    val iotdbOptions = new IoTDBOptions(parameters)
 
+    if (!data.columns.contains("device_name")) {
+      data.columns.foreach(column => if (!column.startsWith("root.") && column 
!= "Time") sys.error("Invalidate column: " + column))
+      val narrowDf = Transformer.toNarrowForm(sqlContext.sparkSession, data)
+      DataFrameTools.insertDataFrame(iotdbOptions, narrowDf)
+    } else {
+      DataFrameTools.insertDataFrame(iotdbOptions, data)
+    }
+
+    new IoTDBRelation(iotdbOptions)(sqlContext.sparkSession)
   }
 }
\ No newline at end of file
diff --git 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
index c4aaa3d..62bb59e 100644
--- 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
+++ 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBOptions.scala
@@ -29,7 +29,7 @@ class IoTDBOptions(
 
   val password = parameters.getOrElse("password", "root")
 
-  val sql = parameters.getOrElse("sql", sys.error("Option 'sql' not 
specified"))
+  val sql = parameters.getOrElse("sql", "")
 
   val numPartition = parameters.getOrElse("numPartition", "1")
 
diff --git 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
index c5d2cb3..7c811e4 100644
--- 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
+++ 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
@@ -60,11 +60,10 @@ class IoTDBRDD private[iotdb](
     val part = split.asInstanceOf[IoTDBPartition]
 
     var taskInfo: String = _
-    Option(TaskContext.get()).foreach { taskContext => {
+    Option(TaskContext.get()).foreach { taskContext =>
       taskContext.addTaskCompletionListener { _ => conn.close() }
       taskInfo = "task Id: " + taskContext.taskAttemptId() + " partition Id: " 
+ taskContext.partitionId()
     }
-    }
 
     Class.forName("org.apache.iotdb.jdbc.IoTDBDriver")
     val conn: Connection = DriverManager.getConnection(options.url, 
options.user, options.password)
@@ -112,8 +111,8 @@ class IoTDBRDD private[iotdb](
 
     override def hasNext: Boolean = {
       if (!finished && !gotNext) {
-          nextValue = getNext
-          gotNext = true
+        nextValue = getNext
+        gotNext = true
       }
       !finished
     }
diff --git 
a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java
 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java
new file mode 100644
index 0000000..aee647d
--- /dev/null
+++ 
b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/tools/DataFrameTools.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.db.tools;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.spark.db.IoTDBOptions;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.thrift.annotation.Nullable;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.FLOAT;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT64;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.TEXT;
+
+public class DataFrameTools {
+  /**
+   * * insert a narrow dataframe into IoTDB
+   *
+   * @param options the options to create a IoTDB session
+   * @param dataFrame a dataframe of narrow table
+   * @return
+   */
+  public static void insertDataFrame(IoTDBOptions options, Dataset<Row> 
dataFrame) {
+    List<Tuple2<String, String>> sensorTypes = new 
ArrayList<>(Arrays.asList(dataFrame.dtypes()));
+    sensorTypes.remove(0);
+    sensorTypes.remove(0);
+
+    List<Row> devices = 
dataFrame.select("device_name").distinct().collectAsList();
+
+    Dataset<Row> repartition = 
dataFrame.repartition(dataFrame.col("device_name"));
+
+    for (Row row : devices) {
+      String device = row.get(0).toString();
+      repartition
+          .where(String.format("device_name == '%s'", device))
+          .foreachPartition(
+              partition -> {
+                String[] hostPort = options.url().split("//")[1].replace("/", 
"").split(":");
+                Session session =
+                    new Session.Builder()
+                        .host(hostPort[0])
+                        .port(Integer.valueOf(hostPort[1]))
+                        .username(String.valueOf(options.user()))
+                        .password(String.valueOf(options.password()))
+                        .build();
+                session.open();
+                partition.forEachRemaining(
+                    record -> {
+                      ArrayList<String> measurements = new ArrayList<>();
+                      ArrayList<TSDataType> types = new ArrayList<>();
+                      ArrayList<Object> values = new ArrayList<>();
+                      for (int i = 2; i < record.length(); i++) {
+                        Object value = record.get(i);
+                        if (value == null) {
+                          continue;
+                        }
+                        value =
+                            typeTrans(record.get(i).toString(), 
getType(sensorTypes.get(i - 2)._2));
+
+                        values.add(value);
+                        measurements.add(sensorTypes.get(i - 2)._1);
+                        types.add(getType(sensorTypes.get(i - 2)._2));
+                      }
+                      try {
+                        session.insertRecord(
+                            record.get(1).toString(),
+                            (Long) record.get(0),
+                            measurements,
+                            types,
+                            values);
+                      } catch (IoTDBConnectionException e) {
+                        e.printStackTrace();
+                      } catch (StatementExecutionException e) {
+                        e.printStackTrace();
+                      }
+                    });
+                session.close();
+              });
+    }
+  }
+  /**
+   * @param value
+   * @param type
+   * @return
+   */
+  private static Object typeTrans(String value, TSDataType type) {
+    try {
+      switch (type) {
+        case TEXT:
+          return value;
+        case BOOLEAN:
+          return Boolean.valueOf(value);
+        case INT32:
+          return Integer.valueOf(value);
+        case INT64:
+          return Long.valueOf(value);
+        case FLOAT:
+          return Float.valueOf(value);
+        case DOUBLE:
+          return Double.valueOf(value);
+        default:
+          return null;
+      }
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+
+  /**
+   * return the TSDataType
+   *
+   * @param typeStr
+   * @return
+   */
+  private static TSDataType getType(String typeStr) {
+    switch (typeStr) {
+      case "StringType":
+        return TEXT;
+      case "BooleanType":
+        return BOOLEAN;
+      case "IntegerType":
+        return INT32;
+      case "LongType":
+        return INT64;
+      case "FloatType":
+        return FLOAT;
+      case "DoubleType":
+        return DOUBLE;
+      default:
+        return null;
+    }
+  }
+}
diff --git 
a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
 
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
index 2e84ac9..26ba987 100644
--- 
a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
+++ 
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBTest.scala
@@ -20,7 +20,6 @@
 package org.apache.iotdb.spark.db
 
 import java.io.ByteArrayOutputStream
-
 import org.apache.iotdb.db.conf.IoTDBConstant
 import org.apache.iotdb.db.service.IoTDB
 import org.apache.iotdb.jdbc.Config
@@ -70,32 +69,32 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
   }
 
   test("test show data") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
-      .option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * 
from root.**").load
+    val df = spark.read.format("org.apache.iotdb.spark.db")
+      .option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select ** 
from root").load
     Assert.assertEquals(7505, df.count())
   }
 
   test("test show data with partition") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
+    val df = spark.read.format("org.apache.iotdb.spark.db")
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .option("sql", "select * from root.**")
+      .option("sql", "select ** from root")
       .option("lowerBound", 1).option("upperBound", System.nanoTime() / 1000 / 
1000)
       .option("numPartition", 10).load
     Assert.assertEquals(7505, df.count())
   }
 
   test("test filter data") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
+    val df = spark.read.format("org.apache.iotdb.spark.db")
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .option("sql", "select * from root.** where time < 2000 and time > 
1000").load
+      .option("sql", "select ** from root where time < 2000 and time > 
1000").load
 
     Assert.assertEquals(499, df.count())
   }
 
   test("test filter data with partition") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
+    val df = spark.read.format("org.apache.iotdb.spark.db")
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .option("sql", "select * from root.** where time < 2000 and time > 1000")
+      .option("sql", "select ** from root where time < 2000 and time > 1000")
       .option("lowerBound", 1)
       .option("upperBound", 10000).option("numPartition", 10).load
 
@@ -103,17 +102,17 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
   }
 
   test("test transform to narrow") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
+    val df = spark.read.format("org.apache.iotdb.spark.db")
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .option("sql", "select * from root.** where time < 1100 and time > 
1000").load
+      .option("sql", "select ** from root where time < 1100 and time > 
1000").load
     val narrow_df = Transformer.toNarrowForm(spark, df)
     Assert.assertEquals(198, narrow_df.count())
   }
 
   test("test transform to narrow with partition") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
+    val df = spark.read.format("org.apache.iotdb.spark.db")
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .option("sql", "select * from root.** where time < 1100 and time > 1000")
+      .option("sql", "select ** from root where time < 1100 and time > 1000")
       .option("lowerBound", 1).option("upperBound", 10000)
       .option("numPartition", 10).load
     val narrow_df = Transformer.toNarrowForm(spark, df)
@@ -121,16 +120,16 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
   }
 
   test("test transform back to wide") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
+    val df = spark.read.format("org.apache.iotdb.spark.db")
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .option("sql", "select * from root.** where time < 1100 and time > 
1000").load
+      .option("sql", "select ** from root where time < 1100 and time > 
1000").load
     val narrow_df = Transformer.toNarrowForm(spark, df)
     val wide_df = Transformer.toWideForm(spark, narrow_df)
     Assert.assertEquals(99, wide_df.count())
   }
 
   test("test aggregate sql") {
-    val df = spark.read.format("org.apache.iotdb.sparkdb")
+    val df = spark.read.format("org.apache.iotdb.spark.db")
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
       .option("sql", "select count(d0.s0),count(d0.s1) from root.vehicle").load
 
@@ -139,7 +138,6 @@ class IoTDBTest extends FunSuite with BeforeAndAfterAll {
       df.show(df.count.toInt, false)
     }
     val actual = outCapture.toByteArray.map(_.toChar)
-
     val expect =
       "+-------------------------+-------------------------+\n" +
         "|count(root.vehicle.d0.s0)|count(root.vehicle.d0.s1)|\n" +
diff --git 
a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
 
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
new file mode 100644
index 0000000..0c5928d
--- /dev/null
+++ 
b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.spark.db
+
+import org.apache.iotdb.db.conf.IoTDBConstant
+import org.apache.iotdb.db.service.IoTDB
+import org.apache.iotdb.jdbc.Config
+import org.apache.iotdb.session.Session
+import org.apache.spark.sql.SparkSession
+import org.junit.{AfterClass, Before}
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
+  private var daemon: IoTDB = _
+  private var spark: SparkSession = _
+  private var session: Session = _
+
+  @Before
+  override protected def beforeAll(): Unit = {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/")
+    super.beforeAll()
+
+    EnvironmentUtils.closeStatMonitor()
+    daemon = IoTDB.getInstance
+    daemon.active()
+    EnvironmentUtils.envSetUp()
+    Class.forName(Config.JDBC_DRIVER_NAME)
+
+    spark = SparkSession
+      .builder()
+      .config("spark.master", "local")
+      .appName("TSFile test")
+      .getOrCreate()
+
+    session = new Session("127.0.0.1", 6667, "root", "root")
+    session.open()
+  }
+
+  @AfterClass
+  override protected def afterAll(): Unit = {
+    if (spark != null) {
+      spark.sparkContext.stop()
+    }
+
+    daemon.stop()
+    EnvironmentUtils.cleanEnv()
+
+    session.close()
+    super.afterAll()
+  }
+
+  test("test insert wide data") {
+    val df = spark.createDataFrame(List(
+      (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
+      (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
+
+    val dfWithColumn = df.withColumnRenamed("_1", "Time")
+      .withColumnRenamed("_2", "root.test.d0.int")
+      .withColumnRenamed("_3", "root.test.d0.long")
+      .withColumnRenamed("_4", "root.test.d0.float")
+      .withColumnRenamed("_5", "root.test.d0.double")
+      .withColumnRenamed("_6", "root.test.d0.boolean")
+      .withColumnRenamed("_7", "root.test.d0.text")
+    dfWithColumn.write.format("org.apache.iotdb.spark.db")
+      .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+      .save
+
+    val result = session.executeQueryStatement("select ** from root")
+    var size = 0
+    while (result.hasNext) {
+      size += 1
+    }
+    assertResult(2)(size)
+  }
+
+  test("test insert narrow data") {
+    val df = spark.createDataFrame(List(
+      (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
+      (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
+
+    val dfWithColumn = df.withColumnRenamed("_1", "Time")
+      .withColumnRenamed("_2", "device_name")
+      .withColumnRenamed("_3", "int")
+      .withColumnRenamed("_4", "long")
+      .withColumnRenamed("_5", "float")
+      .withColumnRenamed("_6", "double")
+      .withColumnRenamed("_7", "boolean")
+      .withColumnRenamed("_8", "text")
+    dfWithColumn.write.format("org.apache.iotdb.spark.db")
+      .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+      .save
+
+    val result = session.executeQueryStatement("select ** from root")
+    var size = 0
+    while (result.hasNext) {
+      size += 1
+    }
+    assertResult(2)(size)
+  }
+}

Reply via email to