This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new b9f4a45 [To rel/0.13] Add SessionPool and example (#75)
b9f4a45 is described below
commit b9f4a45d5c31a1b16bd5644e7dee3c7bbbbae625
Author: Liwen Fu <[email protected]>
AuthorDate: Fri Feb 17 22:52:18 2023 +0800
[To rel/0.13] Add SessionPool and example (#75)
---
client/sessionpool.go | 130 +++++
example/session_pool/session_pool_example.go | 775 +++++++++++++++++++++++++++
2 files changed, 905 insertions(+)
diff --git a/client/sessionpool.go b/client/sessionpool.go
new file mode 100644
index 0000000..dbcb7bb
--- /dev/null
+++ b/client/sessionpool.go
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "errors"
+ "log"
+ "runtime"
+ "time"
+)
+
+var errTimeout = errors.New("get session timeout")
+var errPoolClosed = errors.New("sessionPool has closed")
+var defaultMultiple = 5
+
+type SessionPool struct {
+ config *PoolConfig
+ maxSize int
+ waitToGetSessionTimeoutInMs int
+ enableCompression bool
+ connectionTimeoutInMs int
+ ch chan Session
+ sem chan int8
+}
+
+type PoolConfig struct {
+ Host string
+ Port string
+ NodeUrls []string
+ UserName string
+ Password string
+ FetchSize int32
+ TimeZone string
+ ConnectRetryMax int
+}
+
+func NewSessionPool(conf *PoolConfig, maxSize, connectionTimeoutInMs,
waitToGetSessionTimeoutInMs int,
+ enableCompression bool) SessionPool {
+ if maxSize <= 0 {
+ maxSize = runtime.NumCPU() * defaultMultiple
+ }
+ var ch = make(chan Session, maxSize)
+ var sem = make(chan int8, maxSize)
+ return SessionPool{
+ config: conf,
+ maxSize: maxSize,
+ waitToGetSessionTimeoutInMs: waitToGetSessionTimeoutInMs,
+ connectionTimeoutInMs: connectionTimeoutInMs,
+ enableCompression: enableCompression,
+ ch: ch,
+ sem: sem,
+ }
+}
+
+func (spool *SessionPool) GetSession() (session Session, err error) {
+ for {
+ select {
+ case spool.sem <- 1:
+ select {
+ case session, ok := <-spool.ch:
+ if ok {
+ return session, nil
+ } else {
+ log.Println("sessionpool has closed")
+ return session, errPoolClosed
+ }
+ default:
+ config := spool.config
+ session, err := spool.ConstructSession(config)
+ return session, err
+ }
+ case <-time.After(time.Millisecond *
time.Duration(spool.waitToGetSessionTimeoutInMs)):
+ log.Println("get session timeout")
+ return session, errTimeout
+ }
+ }
+}
+
+func (spool *SessionPool) ConstructSession(config *PoolConfig) (Session,
error) {
+ session := NewSession(getSessionConfig(config))
+ if err := session.Open(spool.enableCompression,
spool.connectionTimeoutInMs); err != nil {
+ log.Print(err)
+ return session, err
+ }
+ return session, nil
+}
+
+func getSessionConfig(config *PoolConfig) *Config {
+ return &Config{
+ Host: config.Host,
+ Port: config.Port,
+ UserName: config.UserName,
+ Password: config.Password,
+ FetchSize: config.FetchSize,
+ TimeZone: config.TimeZone,
+ ConnectRetryMax: config.ConnectRetryMax,
+ }
+}
+
+func (spool *SessionPool) PutBack(session Session) {
+ if session.trans.IsOpen() {
+ spool.ch <- session
+ }
+ <-spool.sem
+}
+
+func (spool *SessionPool) Close() {
+ close(spool.ch)
+ for s := range spool.ch {
+ s.Close()
+ }
+ close(spool.sem)
+}
diff --git a/example/session_pool/session_pool_example.go
b/example/session_pool/session_pool_example.go
new file mode 100644
index 0000000..26a7fb7
--- /dev/null
+++ b/example/session_pool/session_pool_example.go
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/apache/iotdb-client-go/client"
+ "github.com/apache/iotdb-client-go/rpc"
+)
+
+var (
+ host string
+ port string
+ user string
+ password string
+)
+var sessionPool client.SessionPool
+
+func main() {
+ flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
+ flag.StringVar(&port, "port", "6667", "--port=6667")
+ flag.StringVar(&user, "user", "root", "--user=root")
+ flag.StringVar(&password, "password", "root", "--password=root")
+ flag.Parse()
+ config := &client.PoolConfig{
+ Host: host,
+ Port: port,
+ UserName: user,
+ Password: password,
+ }
+ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+ defer sessionPool.Close()
+ var wg sync.WaitGroup
+ for i := 0; i < 10000; i++ {
+ var j = i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ setStorageGroup(fmt.Sprintf("root.ln%d", j))
+ deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
+
+ }()
+
+ }
+ setStorageGroup("root.ln1")
+ setStorageGroup("root.ln2")
+ deleteStorageGroups("root.ln1", "root.ln2")
+
+ createTimeseries("root.sg1.dev1.status")
+ deleteTimeseries("root.sg1.dev1.status")
+
+ createMultiTimeseries()
+ deleteTimeseries("root.sg1.dev1.temperature")
+
+ createAlignedTimeseries("root.sg1.dev1", []string{"status",
"temperature"}, []string{"sts", "temp"})
+ deleteTimeseries("root.sg1.dev1.status")
+ deleteTimeseries("root.sg1.dev1.temperature")
+
+ insertStringRecord()
+ deleteTimeseries("root.ln.wf02.wt02.hardware")
+
+ insertRecord()
+ deleteTimeseries("root.sg1.dev1.status")
+
+ insertRecords()
+ deleteTimeseries("root.sg1.dev1.status")
+
+ insertTablet()
+
+ deleteTimeseries("root.ln.device1.restart_count",
"root.ln.device1.price", "root.ln.device1.tick_count",
"root.ln.device1.temperature", "root.ln.device1.description",
"root.ln.device1.status")
+ insertTablets()
+ deleteTimeseries("root.ln.device1.restart_count",
"root.ln.device1.price", "root.ln.device1.tick_count",
"root.ln.device1.temperature", "root.ln.device1.description",
"root.ln.device1.status")
+
+ insertRecord()
+ deleteData()
+
+ setTimeZone()
+ if tz, err := getTimeZone(); err == nil {
+ fmt.Printf("TimeZone: %s\n", tz)
+ } else {
+ fmt.Printf("getTimeZone ERROR: %v\n", err)
+ }
+
+ executeStatement()
+ executeQueryStatement("select count(s3) from root.sg1.dev1")
+ executeRawDataQuery()
+ executeBatchStatement()
+
+ deleteTimeseries("root.sg1.dev1.status")
+ deleteTimeseries("root.ln.wf02.wt02.s5")
+
+ //0.12.x and newer
+ insertRecordsOfOneDevice()
+ deleteTimeseries("root.sg1.dev0.*")
+
+ insertAlignedRecord()
+ deleteTimeseries("root.al1.dev1.*")
+
+ insertAlignedRecords()
+ deleteTimeseries("root.al1.**")
+
+ insertAlignedRecordsOfOneDevice()
+ deleteTimeseries("root.al1.dev4.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablet()
+ deleteTimeseries("root.ln.device1.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablets()
+ deleteTimeseries("root.ln.device1.*")
+ executeQueryStatement("show timeseries root.**")
+ for i := 0; i < 10000; i++ {
+ var j = i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ setStorageGroup(fmt.Sprintf("root.ln%d", j))
+ deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
+
+ }()
+
+ }
+ wg.Wait()
+
+}
+
+func setStorageGroup(sg string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ session.SetStorageGroup(sg)
+ }
+}
+
+func deleteStorageGroup(sg string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteStorageGroup(sg))
+ }
+}
+
+func deleteStorageGroups(sgs ...string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteStorageGroups(sgs...))
+ }
+}
+
+func createTimeseries(path string) {
+ var (
+ dataType = client.FLOAT
+ encoding = client.PLAIN
+ compressor = client.SNAPPY
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.CreateTimeseries(path, dataType, encoding,
compressor, nil, nil))
+ }
+}
+
+func createAlignedTimeseries(prefixPath string, measurements, measurementAlias
[]string) {
+ var (
+ dataTypes = []client.TSDataType{
+ client.FLOAT,
+ client.FLOAT,
+ }
+ encodings = []client.TSEncoding{
+ client.PLAIN,
+ client.PLAIN,
+ }
+ compressors = []client.TSCompressionType{
+ client.LZ4,
+ client.LZ4,
+ }
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.CreateAlignedTimeseries(prefixPath,
measurements, dataTypes, encodings, compressors, measurementAlias))
+ }
+
+}
+
+func createMultiTimeseries() {
+ var (
+ paths = []string{"root.sg1.dev1.temperature"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ encodings = []client.TSEncoding{client.PLAIN}
+ compressors = []client.TSCompressionType{client.SNAPPY}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.CreateMultiTimeseries(paths, dataTypes,
encodings, compressors))
+ }
+
+}
+
+func deleteTimeseries(paths ...string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteTimeseries(paths))
+ }
+
+}
+
+func insertStringRecord() {
+ var (
+ deviceId = "root.ln.wf02.wt02"
+ measurements = []string{"hardware"}
+ values = []string{"123"}
+ timestamp int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertStringRecord(deviceId, measurements,
values, timestamp))
+ }
+
+}
+
+func insertRecord() {
+ var (
+ deviceId = "root.sg1.dev1"
+ measurements = []string{"status"}
+ values = []interface{}{"123"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ timestamp int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertRecord(deviceId, measurements,
dataTypes, values, timestamp))
+ }
+
+}
+
+func insertAlignedRecord() {
+ var (
+ deviceId = "root.al1.dev1"
+ measurements = []string{"status"}
+ values = []interface{}{"123"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ timestamp int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedRecord(deviceId, measurements,
dataTypes, values, timestamp))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+ }
+
+}
+
+func insertRecords() {
+ var (
+ deviceId = []string{"root.sg1.dev1"}
+ measurements = [][]string{{"status"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}}
+ values = [][]interface{}{{"123"}}
+ timestamp = []int64{12}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertRecords(deviceId, measurements,
dataTypes, values, timestamp))
+ }
+
+}
+
+func insertAlignedRecords() {
+ var (
+ deviceIds = []string{"root.al1.dev2", "root.al1.dev3"}
+ measurements = [][]string{{"status"}, {"temperature"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT},
{client.TEXT}}
+ values = [][]interface{}{{"33"}, {"44"}}
+ timestamps = []int64{12, 13}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedRecords(deviceIds,
measurements, dataTypes, values, timestamps))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+ }
+
+}
+
+func insertRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.sg1.dev0"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertRecordsOfOneDevice(deviceId,
timestamps, measurementsSlice, dataTypes, values, false))
+ }
+
+}
+
+func insertAlignedRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.al1.dev4"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedRecordsOfOneDevice(deviceId,
timestamps, measurementsSlice, dataTypes, values, false))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ sessionDataSetNew, err := session.ExecuteStatement("select
restart_count,tick_count,price,temperature,description,status from
root.al1.dev4")
+ if err == nil {
+ printDataSet0(sessionDataSetNew)
+ sessionDataSetNew.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+ }
+
+}
+
+func deleteData() {
+ var (
+ paths = []string{"root.sg1.dev1.status"}
+ startTime int64 = 0
+ endTime int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteData(paths, startTime, endTime))
+ }
+
+}
+
+func insertTablet() {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ if tablet, err := createTablet(12); err == nil {
+ status, err := session.InsertTablet(tablet, false)
+ checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ }
+
+}
+
+func insertAlignedTablet() {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ if tablet, err := createTablet(12); err == nil {
+ status, err := session.InsertAlignedTablet(tablet,
false)
+ checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ var timeout int64 = 1000
+ if ds, err := session.ExecuteQueryStatement("select * from
root.ln.device1", &timeout); err == nil {
+ printDevice1(ds)
+ ds.Close()
+ } else {
+ log.Fatal(err)
+ }
+ }
+
+}
+
+func createTablet(rowCount int) (*client.Tablet, error) {
+ tablet, err := client.NewTablet("root.ln.device1",
[]*client.MeasurementSchema{
+ {
+ Measurement: "restart_count",
+ DataType: client.INT32,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "price",
+ DataType: client.DOUBLE,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "tick_count",
+ DataType: client.INT64,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "temperature",
+ DataType: client.FLOAT,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "description",
+ DataType: client.TEXT,
+ Encoding: client.PLAIN,
+ Compressor: client.SNAPPY,
+ },
+ {
+ Measurement: "status",
+ DataType: client.BOOLEAN,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ },
+ }, rowCount)
+
+ if err != nil {
+ return nil, err
+ }
+ ts := time.Now().UTC().UnixNano() / 1000000
+ for row := 0; row < int(rowCount); row++ {
+ ts++
+ tablet.SetTimestamp(ts, row)
+ tablet.SetValueAt(rand.Int31(), 0, row)
+ tablet.SetValueAt(rand.Float64(), 1, row)
+ tablet.SetValueAt(rand.Int63(), 2, row)
+ tablet.SetValueAt(rand.Float32(), 3, row)
+ tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
+ tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+ }
+ return tablet, nil
+}
+
+func insertTablets() {
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertTablets(tablets, false))
+ }
+
+}
+
+func insertAlignedTablets() {
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedTablets(tablets, false))
+ }
+
+}
+
+func setTimeZone() {
+ var timeZone = "GMT"
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ session.SetTimeZone(timeZone)
+ }
+
+}
+
+func getTimeZone() (string, error) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ return session.GetTimeZone()
+ }
+ return "", err
+}
+
+func executeQueryStatement(sql string) {
+ var timeout int64 = 1000
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
+ if err == nil {
+ defer sessionDataSet.Close()
+ printDataSet1(sessionDataSet)
+ } else {
+ log.Println(err)
+ }
+}
+
+func executeStatement() {
+ var sql = "show storage group"
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ sessionDataSet, err := session.ExecuteStatement(sql)
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+}
+
+func executeRawDataQuery() {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5)
values(1,true)")
+ var (
+ paths []string = []string{"root.ln.wf02.wt02.s5"}
+ startTime int64 = 1
+ endTime int64 = 200
+ )
+ sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime,
endTime)
+ if err == nil {
+ printDataSet2(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+}
+
+func executeBatchStatement() {
+ var sqls = []string{"insert into root.ln.wf02.wt02(time,s5)
values(1,true)",
+ "insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.ExecuteBatchStatement(sqls))
+ }
+
+}
+
+func printDevice1(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for _, columnName := range sds.GetColumnNames() {
+ fmt.Printf("%s\t", columnName)
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
+ if showTimestamp {
+ fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
+ }
+
+ var restartCount int32
+ var price float64
+ var tickCount int64
+ var temperature float32
+ var description string
+ var status bool
+
+ // All of iotdb datatypes can be scan into string variables
+ // var restartCount string
+ // var price string
+ // var tickCount string
+ // var temperature string
+ // var description string
+ // var status string
+
+ if err := sds.Scan(&restartCount, &price, &tickCount,
&temperature, &description, &status); err != nil {
+ log.Fatal(err)
+ }
+
+ whitespace := "\t\t"
+ fmt.Printf("%v%s", restartCount, whitespace)
+ fmt.Printf("%v%s", price, whitespace)
+ fmt.Printf("%v%s", tickCount, whitespace)
+ fmt.Printf("%v%s", temperature, whitespace)
+ fmt.Printf("%v%s", description, whitespace)
+ fmt.Printf("%v%s", status, whitespace)
+
+ fmt.Println()
+ }
+}
+
+func printDataSet0(sessionDataSet *client.SessionDataSet) {
+ showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sessionDataSet.Next(); err == nil && next; next, err =
sessionDataSet.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t",
sessionDataSet.GetText(client.TimestampColumnName))
+ }
+ for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+ columnName := sessionDataSet.GetColumnName(i)
+ switch sessionDataSet.GetColumnDataType(i) {
+ case client.BOOLEAN:
+ fmt.Print(sessionDataSet.GetBool(columnName))
+ case client.INT32:
+ fmt.Print(sessionDataSet.GetInt32(columnName))
+ case client.INT64:
+ fmt.Print(sessionDataSet.GetInt64(columnName))
+ case client.FLOAT:
+ fmt.Print(sessionDataSet.GetFloat(columnName))
+ case client.DOUBLE:
+ fmt.Print(sessionDataSet.GetDouble(columnName))
+ case client.TEXT:
+ fmt.Print(sessionDataSet.GetText(columnName))
+ default:
+ }
+ fmt.Print("\t\t")
+ }
+ fmt.Println()
+ }
+}
+
+func printDataSet1(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sds.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
+ if showTimestamp {
+ fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
+ }
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ columnName := sds.GetColumnName(i)
+ v := sds.GetValue(columnName)
+ if v == nil {
+ v = "null"
+ }
+ fmt.Printf("%v\t\t", v)
+ }
+ fmt.Println()
+ }
+}
+
+func printDataSet2(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sds.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
+ if showTimestamp {
+ fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
+ }
+
+ if record, err := sds.GetRowRecord(); err == nil {
+ for _, field := range record.GetFields() {
+ v := field.GetValue()
+ if field.IsNull() {
+ v = "null"
+ }
+ fmt.Printf("%v\t\t", v)
+ }
+ fmt.Println()
+ }
+ }
+}
+
+func checkError(status *rpc.TSStatus, err error) {
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if status != nil {
+ if err = client.VerifySuccess(status); err != nil {
+ log.Println(err)
+ }
+ }
+}