This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch opt_go_client
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git

commit e6a37915a50abdfedc1c8bb24b1f25908e61f6cb
Author: HTHou <[email protected]>
AuthorDate: Mon Nov 18 21:09:13 2024 +0800

    format
---
 client/session.go | 1819 +++++++++++++++++++++++++++--------------------------
 1 file changed, 910 insertions(+), 909 deletions(-)

diff --git a/client/session.go b/client/session.go
index 26ebff0..36d0792 100644
--- a/client/session.go
+++ b/client/session.go
@@ -20,166 +20,167 @@
 package client
 
 import (
-       "bytes"
-       "container/list"
-       "context"
-       "encoding/binary"
-       "errors"
-       "fmt"
-       "github.com/apache/iotdb-client-go/common"
-       "log"
-       "net"
-       "reflect"
-       "sort"
-       "strings"
-       "time"
-
-       "github.com/apache/iotdb-client-go/rpc"
-       "github.com/apache/thrift/lib/go/thrift"
+    "bytes"
+    "container/list"
+    "context"
+    "encoding/binary"
+    "errors"
+    "fmt"
+    "github.com/apache/iotdb-client-go/common"
+    "log"
+    "net"
+    "reflect"
+    "sort"
+    "strings"
+    "time"
+
+    "github.com/apache/iotdb-client-go/rpc"
+    "github.com/apache/thrift/lib/go/thrift"
 )
 
 const (
-       DefaultTimeZone        = "Asia/Shanghai"
-       DefaultFetchSize       = 1024
-       DefaultConnectRetryMax = 3
+    DefaultTimeZone        = "Asia/Shanghai"
+    DefaultFetchSize       = 1024
+    DefaultConnectRetryMax = 3
 )
 
 var errLength = errors.New("deviceIds, times, measurementsList and 
valuesList's size should be equal")
 
 type Config struct {
-       Host                 string
-       Port                 string
-       UserName             string
-       Password             string
-       FetchSize            int32
-       TimeZone             string
-       ConnectRetryMax      int
-       enableRPCCompression bool
+    Host                 string
+    Port                 string
+    UserName             string
+    Password             string
+    FetchSize            int32
+    TimeZone             string
+    ConnectRetryMax      int
+    enableRPCCompression bool
 }
 
 type Session struct {
-       config             *Config
-       client             *rpc.IClientRPCServiceClient
-       sessionId          int64
-       trans              thrift.TTransport
-       requestStatementId int64
+    config             *Config
+    client             *rpc.IClientRPCServiceClient
+    sessionId          int64
+    trans              thrift.TTransport
+    requestStatementId int64
 }
 
 type endPoint struct {
-       Host string
-       Port string
+    Host string
+    Port string
 }
 
 var endPointList = list.New()
 
 func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) 
error {
-       if s.config.FetchSize <= 0 {
-               s.config.FetchSize = DefaultFetchSize
-       }
-       if s.config.TimeZone == "" {
-               s.config.TimeZone = DefaultTimeZone
-       }
-
-       if s.config.ConnectRetryMax <= 0 {
-               s.config.ConnectRetryMax = DefaultConnectRetryMax
-       }
-
-       var protocolFactory thrift.TProtocolFactory
-       var err error
-
-       // in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it 
returns one.
-       s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, 
s.config.Port), &thrift.TConfiguration{
-               ConnectTimeout: time.Duration(connectionTimeoutInMs) * 
time.Millisecond, // Use 0 for no timeout
-       })
-       // s.trans = thrift.NewTFramedTransport(s.trans)        // deprecated
-       var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
-       s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
-       if !s.trans.IsOpen() {
-               err = s.trans.Open()
-               if err != nil {
-                       return err
-               }
-       }
-       s.config.enableRPCCompression = enableRPCCompression
-       protocolFactory = getProtocolFactory(enableRPCCompression)
-       iprot := protocolFactory.GetProtocol(s.trans)
-       oprot := protocolFactory.GetProtocol(s.trans)
-       s.client = 
rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
-       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
-               Password: &s.config.Password}
-       resp, err := s.client.OpenSession(context.Background(), &req)
-       if err != nil {
-               return err
-       }
-       s.sessionId = resp.GetSessionId()
-       s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
-       return err
+    if s.config.FetchSize <= 0 {
+        s.config.FetchSize = DefaultFetchSize
+    }
+    if s.config.TimeZone == "" {
+        s.config.TimeZone = DefaultTimeZone
+    }
+
+    if s.config.ConnectRetryMax <= 0 {
+        s.config.ConnectRetryMax = DefaultConnectRetryMax
+    }
+
+    var protocolFactory thrift.TProtocolFactory
+    var err error
+
+    // in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it 
returns one.
+    s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, 
s.config.Port), &thrift.TConfiguration{
+        ConnectTimeout: time.Duration(connectionTimeoutInMs) * 
time.Millisecond, // Use 0 for no timeout
+    })
+    // s.trans = thrift.NewTFramedTransport(s.trans)   // deprecated
+    var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
+    s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
+    if !s.trans.IsOpen() {
+        err = s.trans.Open()
+        if err != nil {
+            return err
+        }
+    }
+    s.config.enableRPCCompression = enableRPCCompression
+    protocolFactory = getProtocolFactory(enableRPCCompression)
+    iprot := protocolFactory.GetProtocol(s.trans)
+    oprot := protocolFactory.GetProtocol(s.trans)
+    s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, 
oprot))
+    req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
+        Password: &s.config.Password}
+    resp, err := s.client.OpenSession(context.Background(), &req)
+    if err != nil {
+        return err
+    }
+    s.sessionId = resp.GetSessionId()
+    s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
+    return err
 }
 
 type ClusterConfig struct {
-       NodeUrls        []string //ip:port
-       UserName        string
-       Password        string
-       FetchSize       int32
-       TimeZone        string
-       ConnectRetryMax int
+    NodeUrls        []string //ip:port
+    UserName        string
+    Password        string
+    FetchSize       int32
+    TimeZone        string
+    ConnectRetryMax int
 }
 
 type ClusterSession struct {
-       config             *ClusterConfig
-       client             *rpc.IClientRPCServiceClient
-       sessionId          int64
-       trans              thrift.TTransport
-       requestStatementId int64
+    config             *ClusterConfig
+    client             *rpc.IClientRPCServiceClient
+    sessionId          int64
+    trans              thrift.TTransport
+    requestStatementId int64
 }
 
 func (s *Session) OpenCluster(enableRPCCompression bool) error {
-       if s.config.FetchSize <= 0 {
-               s.config.FetchSize = DefaultFetchSize
-       }
-       if s.config.TimeZone == "" {
-               s.config.TimeZone = DefaultTimeZone
-       }
-
-       if s.config.ConnectRetryMax <= 0 {
-               s.config.ConnectRetryMax = DefaultConnectRetryMax
-       }
-
-       var protocolFactory thrift.TProtocolFactory
-       var err error
-
-       s.config.enableRPCCompression = enableRPCCompression
-       protocolFactory = getProtocolFactory(enableRPCCompression)
-       iprot := protocolFactory.GetProtocol(s.trans)
-       oprot := protocolFactory.GetProtocol(s.trans)
-       s.client = 
rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
-       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
-               Password: &s.config.Password}
-
-       resp, err := s.client.OpenSession(context.Background(), &req)
-       if err != nil {
-               return err
-       }
-       s.sessionId = resp.GetSessionId()
-       s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
-       return err
+    if s.config.FetchSize <= 0 {
+        s.config.FetchSize = DefaultFetchSize
+    }
+    if s.config.TimeZone == "" {
+        s.config.TimeZone = DefaultTimeZone
+    }
+
+    if s.config.ConnectRetryMax <= 0 {
+        s.config.ConnectRetryMax = DefaultConnectRetryMax
+    }
+
+    var protocolFactory thrift.TProtocolFactory
+    var err error
+
+    s.config.enableRPCCompression = enableRPCCompression
+    protocolFactory = getProtocolFactory(enableRPCCompression)
+    iprot := protocolFactory.GetProtocol(s.trans)
+    oprot := protocolFactory.GetProtocol(s.trans)
+    s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, 
oprot))
+    req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
+        Password: &s.config.Password}
+
+    resp, err := s.client.OpenSession(context.Background(), &req)
+    if err != nil {
+        return err
+    }
+    s.sessionId = resp.GetSessionId()
+    s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
+    return err
 }
 
 func getProtocolFactory(enableRPCCompression bool) thrift.TProtocolFactory {
-       if enableRPCCompression {
-               return 
thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
-       }
-       return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{})
+    if enableRPCCompression {
+        return thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
+    } else {
+        return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{})
+    }
 }
 
 func (s *Session) Close() (r *common.TSStatus, err error) {
-       req := rpc.NewTSCloseSessionReq()
-       req.SessionId = s.sessionId
-       _, err = s.client.CloseSession(context.Background(), req)
-       if err != nil {
-               return nil, err
-       }
-       return nil, s.trans.Close()
+    req := rpc.NewTSCloseSessionReq()
+    req.SessionId = s.sessionId
+    _, err = s.client.CloseSession(context.Background(), req)
+    if err != nil {
+        return nil, err
+    }
+    return nil, s.trans.Close()
 }
 
 /*
@@ -190,13 +191,13 @@ func (s *Session) Close() (r *common.TSStatus, err error) 
{
  *error: correctness of operation
  */
 func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus, 
err error) {
-       r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, 
storageGroupId)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       r, err = s.client.SetStorageGroup(context.Background(), 
s.sessionId, storageGroupId)
-               }
-       }
-       return r, err
+    r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, 
storageGroupId)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            r, err = s.client.SetStorageGroup(context.Background(), 
s.sessionId, storageGroupId)
+        }
+    }
+    return r, err
 }
 
 /*
@@ -207,13 +208,13 @@ func (s *Session) SetStorageGroup(storageGroupId string) 
(r *common.TSStatus, er
  *error: correctness of operation
  */
 func (s *Session) DeleteStorageGroup(storageGroupId string) (r 
*common.TSStatus, err error) {
-       r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, []string{storageGroupId})
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       r, err = 
s.client.DeleteStorageGroups(context.Background(), s.sessionId, 
[]string{storageGroupId})
-               }
-       }
-       return r, err
+    r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, 
[]string{storageGroupId})
+    if err != nil && r == nil {
+        if s.reconnect() {
+            r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, []string{storageGroupId})
+        }
+    }
+    return r, err
 }
 
 /*
@@ -224,13 +225,13 @@ func (s *Session) DeleteStorageGroup(storageGroupId 
string) (r *common.TSStatus,
  *error: correctness of operation
  */
 func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r 
*common.TSStatus, err error) {
-       r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, storageGroupIds)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       r, err = 
s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
-               }
-       }
-       return r, err
+    r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, 
storageGroupIds)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, storageGroupIds)
+        }
+    }
+    return r, err
 }
 
 /*
@@ -244,16 +245,16 @@ func (s *Session) DeleteStorageGroups(storageGroupIds 
...string) (r *common.TSSt
  *error: correctness of operation
  */
 func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding 
TSEncoding, compressor TSCompressionType, attributes map[string]string, tags 
map[string]string) (r *common.TSStatus, err error) {
-       request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: 
path, DataType: int32(dataType), Encoding: int32(encoding),
-               Compressor: int32(compressor), Attributes: attributes, Tags: 
tags}
-       status, err := s.client.CreateTimeseries(context.Background(), &request)
-       if err != nil && status == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       status, err = 
s.client.CreateTimeseries(context.Background(), &request)
-               }
-       }
-       return status, err
+    request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, 
DataType: int32(dataType), Encoding: int32(encoding),
+        Compressor: int32(compressor), Attributes: attributes, Tags: tags}
+    status, err := s.client.CreateTimeseries(context.Background(), &request)
+    if err != nil && status == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            status, err = s.client.CreateTimeseries(context.Background(), 
&request)
+        }
+    }
+    return status, err
 }
 
 /*
@@ -269,38 +270,38 @@ func (s *Session) CreateTimeseries(path string, dataType 
TSDataType, encoding TS
  *error: correctness of operation
  */
 func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements 
[]string, dataTypes []TSDataType, encodings []TSEncoding, compressors 
[]TSCompressionType, measurementAlias []string) (r *common.TSStatus, err error) 
{
-       destTypes := make([]int32, len(dataTypes))
-       for i, t := range dataTypes {
-               destTypes[i] = int32(t)
-       }
-
-       destEncodings := make([]int32, len(encodings))
-       for i, e := range encodings {
-               destEncodings[i] = int32(e)
-       }
-
-       destCompressions := make([]int32, len(compressors))
-       for i, e := range compressors {
-               destCompressions[i] = int32(e)
-       }
-
-       request := rpc.TSCreateAlignedTimeseriesReq{
-               SessionId:        s.sessionId,
-               PrefixPath:       prefixPath,
-               Measurements:     measurements,
-               DataTypes:        destTypes,
-               Encodings:        destEncodings,
-               Compressors:      destCompressions,
-               MeasurementAlias: measurementAlias,
-       }
-       status, err := s.client.CreateAlignedTimeseries(context.Background(), 
&request)
-       if err != nil && status == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       status, err = 
s.client.CreateAlignedTimeseries(context.Background(), &request)
-               }
-       }
-       return status, err
+    destTypes := make([]int32, len(dataTypes))
+    for i, t := range dataTypes {
+        destTypes[i] = int32(t)
+    }
+
+    destEncodings := make([]int32, len(encodings))
+    for i, e := range encodings {
+        destEncodings[i] = int32(e)
+    }
+
+    destCompressions := make([]int32, len(compressors))
+    for i, e := range compressors {
+        destCompressions[i] = int32(e)
+    }
+
+    request := rpc.TSCreateAlignedTimeseriesReq{
+        SessionId:        s.sessionId,
+        PrefixPath:       prefixPath,
+        Measurements:     measurements,
+        DataTypes:        destTypes,
+        Encodings:        destEncodings,
+        Compressors:      destCompressions,
+        MeasurementAlias: measurementAlias,
+    }
+    status, err := s.client.CreateAlignedTimeseries(context.Background(), 
&request)
+    if err != nil && status == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            status, err = 
s.client.CreateAlignedTimeseries(context.Background(), &request)
+        }
+    }
+    return status, err
 }
 
 /*
@@ -314,33 +315,33 @@ func (s *Session) CreateAlignedTimeseries(prefixPath 
string, measurements []stri
  *error: correctness of operation
  */
 func (s *Session) CreateMultiTimeseries(paths []string, dataTypes 
[]TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r 
*common.TSStatus, err error) {
-       destTypes := make([]int32, len(dataTypes))
-       for i, t := range dataTypes {
-               destTypes[i] = int32(t)
-       }
-
-       destEncodings := make([]int32, len(encodings))
-       for i, e := range encodings {
-               destEncodings[i] = int32(e)
-       }
-
-       destCompressions := make([]int32, len(compressors))
-       for i, e := range compressors {
-               destCompressions[i] = int32(e)
-       }
-
-       request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, 
Paths: paths, DataTypes: destTypes,
-               Encodings: destEncodings, Compressors: destCompressions}
-       r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
-
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = 
s.client.CreateMultiTimeseries(context.Background(), &request)
-               }
-       }
-
-       return r, err
+    destTypes := make([]int32, len(dataTypes))
+    for i, t := range dataTypes {
+        destTypes[i] = int32(t)
+    }
+
+    destEncodings := make([]int32, len(encodings))
+    for i, e := range encodings {
+        destEncodings[i] = int32(e)
+    }
+
+    destCompressions := make([]int32, len(compressors))
+    for i, e := range compressors {
+        destCompressions[i] = int32(e)
+    }
+
+    request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, Paths: 
paths, DataTypes: destTypes,
+        Encodings: destEncodings, Compressors: destCompressions}
+    r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
+
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.CreateMultiTimeseries(context.Background(), 
&request)
+        }
+    }
+
+    return r, err
 }
 
 /*
@@ -351,13 +352,13 @@ func (s *Session) CreateMultiTimeseries(paths []string, 
dataTypes []TSDataType,
  *error: correctness of operation
  */
 func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err 
error) {
-       r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, 
paths)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       r, err = 
s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
-               }
-       }
-       return r, err
+    r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, 
paths)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            r, err = s.client.DeleteTimeseries(context.Background(), 
s.sessionId, paths)
+        }
+    }
+    return r, err
 }
 
 /*
@@ -370,15 +371,15 @@ func (s *Session) DeleteTimeseries(paths []string) (r 
*common.TSStatus, err erro
  *error: correctness of operation
  */
 func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) 
(r *common.TSStatus, err error) {
-       request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, 
StartTime: startTime, EndTime: endTime}
-       r, err = s.client.DeleteData(context.Background(), &request)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = s.client.DeleteData(context.Background(), 
&request)
-               }
-       }
-       return r, err
+    request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, 
StartTime: startTime, EndTime: endTime}
+    r, err = s.client.DeleteData(context.Background(), &request)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.DeleteData(context.Background(), &request)
+        }
+    }
+    return r, err
 }
 
 /*
@@ -392,224 +393,224 @@ func (s *Session) DeleteData(paths []string, startTime 
int64, endTime int64) (r
  *error: correctness of operation
  */
 func (s *Session) InsertStringRecord(deviceId string, measurements []string, 
values []string, timestamp int64) (r *common.TSStatus, err error) {
-       request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, 
PrefixPath: deviceId, Measurements: measurements,
-               Values: values, Timestamp: timestamp}
-       r, err = s.client.InsertStringRecord(context.Background(), &request)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = 
s.client.InsertStringRecord(context.Background(), &request)
-               }
-       }
-       return r, err
+    request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, PrefixPath: 
deviceId, Measurements: measurements,
+        Values: values, Timestamp: timestamp}
+    r, err = s.client.InsertStringRecord(context.Background(), &request)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertStringRecord(context.Background(), 
&request)
+        }
+    }
+    return r, err
 }
 
 func (s *Session) GetTimeZone() (string, error) {
-       resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
-       if err != nil {
-               return DefaultTimeZone, err
-       }
-       return resp.TimeZone, nil
+    resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
+    if err != nil {
+        return DefaultTimeZone, err
+    }
+    return resp.TimeZone, nil
 }
 
 func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error) 
{
-       request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: 
timeZone}
-       r, err = s.client.SetTimeZone(context.Background(), &request)
-       s.config.TimeZone = timeZone
-       return r, err
+    request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone}
+    r, err = s.client.SetTimeZone(context.Background(), &request)
+    s.config.TimeZone = timeZone
+    return r, err
 }
 
 func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
-       request := rpc.TSExecuteStatementReq{
-               SessionId:   s.sessionId,
-               Statement:   sql,
-               StatementId: s.requestStatementId,
-               FetchSize:   &s.config.FetchSize,
-       }
-       resp, err := s.client.ExecuteStatement(context.Background(), &request)
-
-       if err != nil && resp == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       request.StatementId = s.requestStatementId
-                       resp, err = 
s.client.ExecuteStatement(context.Background(), &request)
-               }
-       }
-
-       return s.genDataSet(sql, resp), err
+    request := rpc.TSExecuteStatementReq{
+        SessionId:   s.sessionId,
+        Statement:   sql,
+        StatementId: s.requestStatementId,
+        FetchSize:   &s.config.FetchSize,
+    }
+    resp, err := s.client.ExecuteStatement(context.Background(), &request)
+
+    if err != nil && resp == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            request.StatementId = s.requestStatementId
+            resp, err = s.client.ExecuteStatement(context.Background(), 
&request)
+        }
+    }
+
+    return s.genDataSet(sql, resp), err
 }
 
 func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, 
err error) {
-       request := rpc.TSExecuteStatementReq{
-               SessionId:   s.sessionId,
-               Statement:   sql,
-               StatementId: s.requestStatementId,
-               FetchSize:   &s.config.FetchSize,
-       }
-       resp, err := s.client.ExecuteStatement(context.Background(), &request)
-
-       if err != nil && resp == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       request.StatementId = s.requestStatementId
-                       resp, err = 
s.client.ExecuteStatement(context.Background(), &request)
-               }
-       }
-
-       return resp.Status, err
+    request := rpc.TSExecuteStatementReq{
+        SessionId:   s.sessionId,
+        Statement:   sql,
+        StatementId: s.requestStatementId,
+        FetchSize:   &s.config.FetchSize,
+    }
+    resp, err := s.client.ExecuteStatement(context.Background(), &request)
+
+    if err != nil && resp == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            request.StatementId = s.requestStatementId
+            resp, err = s.client.ExecuteStatement(context.Background(), 
&request)
+        }
+    }
+
+    return resp.Status, err
 }
 
 func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) 
(*SessionDataSet, error) {
-       request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: 
sql, StatementId: s.requestStatementId,
-               FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
-       if resp, err := s.client.ExecuteQueryStatement(context.Background(), 
&request); err == nil {
-               if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-                       return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
-               } else {
-                       return nil, statusErr
-               }
-       } else {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       request.StatementId = s.requestStatementId
-                       resp, err = 
s.client.ExecuteQueryStatement(context.Background(), &request)
-                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
-                       } else {
-                               return nil, statusErr
-                       }
-               }
-               return nil, err
-       }
+    request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: 
sql, StatementId: s.requestStatementId,
+        FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
+    if resp, err := s.client.ExecuteQueryStatement(context.Background(), 
&request); err == nil {
+        if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+            return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
+        } else {
+            return nil, statusErr
+        }
+    } else {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            request.StatementId = s.requestStatementId
+            resp, err = s.client.ExecuteQueryStatement(context.Background(), 
&request)
+            if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+                return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
+            } else {
+                return nil, statusErr
+            }
+        }
+        return nil, err
+    }
 }
 
 func (s *Session) ExecuteAggregationQuery(paths []string, aggregations 
[]common.TAggregationType,
-       startTime *int64, endTime *int64, interval *int64,
-       timeoutMs *int64) (*SessionDataSet, error) {
-
-       request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, 
StatementId: s.requestStatementId, Paths: paths,
-               Aggregations: aggregations, StartTime: startTime, EndTime: 
endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
-       if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
-               if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-                       return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
-               } else {
-                       return nil, statusErr
-               }
-       } else {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       resp, err = 
s.client.ExecuteAggregationQuery(context.Background(), &request)
-                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
-                       } else {
-                               return nil, statusErr
-                       }
-               }
-               return nil, err
-       }
+    startTime *int64, endTime *int64, interval *int64,
+    timeoutMs *int64) (*SessionDataSet, error) {
+
+    request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: 
s.requestStatementId, Paths: paths,
+        Aggregations: aggregations, StartTime: startTime, EndTime: endTime, 
Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
+    if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
+        if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+            return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
+        } else {
+            return nil, statusErr
+        }
+    } else {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            resp, err = s.client.ExecuteAggregationQuery(context.Background(), 
&request)
+            if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+                return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
+            } else {
+                return nil, statusErr
+            }
+        }
+        return nil, err
+    }
 }
 
 func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, 
aggregations []common.TAggregationType,
-       startTime *int64, endTime *int64, interval *int64,
-       timeoutMs *int64, legalNodes *bool) (*SessionDataSet, error) {
-
-       request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, 
StatementId: s.requestStatementId, Paths: paths,
-               Aggregations: aggregations, StartTime: startTime, EndTime: 
endTime, Interval: interval, FetchSize: &s.config.FetchSize,
-               Timeout: timeoutMs, LegalPathNodes: legalNodes}
-       if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
-               if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-                       return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
-               } else {
-                       return nil, statusErr
-               }
-       } else {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       resp, err = 
s.client.ExecuteAggregationQuery(context.Background(), &request)
-                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
-                       } else {
-                               return nil, statusErr
-                       }
-               }
-               return nil, err
-       }
+    startTime *int64, endTime *int64, interval *int64,
+    timeoutMs *int64, legalNodes *bool) (*SessionDataSet, error) {
+
+    request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: 
s.requestStatementId, Paths: paths,
+        Aggregations: aggregations, StartTime: startTime, EndTime: endTime, 
Interval: interval, FetchSize: &s.config.FetchSize,
+        Timeout: timeoutMs, LegalPathNodes: legalNodes}
+    if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
+        if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+            return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
+        } else {
+            return nil, statusErr
+        }
+    } else {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            resp, err = s.client.ExecuteAggregationQuery(context.Background(), 
&request)
+            if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+                return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
+            } else {
+                return nil, statusErr
+            }
+        }
+        return nil, err
+    }
 }
 
 func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
-       measurements []string,
-       types []TSDataType,
-       values []interface{},
-       isAligned bool) (*rpc.TSInsertRecordReq, error) {
-       request := &rpc.TSInsertRecordReq{}
-       request.SessionId = s.sessionId
-       request.PrefixPath = deviceId
-       request.Timestamp = time
-       request.Measurements = measurements
-       request.IsAligned = &isAligned
-       if bys, err := valuesToBytes(types, values); err == nil {
-               request.Values = bys
-       } else {
-               return nil, err
-       }
-       return request, nil
+    measurements []string,
+    types []TSDataType,
+    values []interface{},
+    isAligned bool) (*rpc.TSInsertRecordReq, error) {
+    request := &rpc.TSInsertRecordReq{}
+    request.SessionId = s.sessionId
+    request.PrefixPath = deviceId
+    request.Timestamp = time
+    request.Measurements = measurements
+    request.IsAligned = &isAligned
+    if bys, err := valuesToBytes(types, values); err == nil {
+        request.Values = bys
+    } else {
+        return nil, err
+    }
+    return request, nil
 }
 
 func (s *Session) InsertRecord(deviceId string, measurements []string, 
dataTypes []TSDataType, values []interface{}, timestamp int64) (r 
*common.TSStatus, err error) {
-       request, err := s.genTSInsertRecordReq(deviceId, timestamp, 
measurements, dataTypes, values, false)
-       if err != nil {
-               return nil, err
-       }
-       r, err = s.client.InsertRecord(context.Background(), request)
-
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = s.client.InsertRecord(context.Background(), 
request)
-               }
-       }
-
-       return r, err
+    request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, 
dataTypes, values, false)
+    if err != nil {
+        return nil, err
+    }
+    r, err = s.client.InsertRecord(context.Background(), request)
+
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertRecord(context.Background(), request)
+        }
+    }
+
+    return r, err
 }
 
 func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, 
dataTypes []TSDataType, values []interface{}, timestamp int64) (r 
*common.TSStatus, err error) {
-       request, err := s.genTSInsertRecordReq(deviceId, timestamp, 
measurements, dataTypes, values, true)
-       if err != nil {
-               return nil, err
-       }
-       r, err = s.client.InsertRecord(context.Background(), request)
-
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = s.client.InsertRecord(context.Background(), 
request)
-               }
-       }
-
-       return r, err
+    request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, 
dataTypes, values, true)
+    if err != nil {
+        return nil, err
+    }
+    r, err = s.client.InsertRecord(context.Background(), request)
+
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertRecord(context.Background(), request)
+        }
+    }
+
+    return r, err
 }
 
 type deviceData struct {
-       timestamps        []int64
-       measurementsSlice [][]string
-       dataTypesSlice    [][]TSDataType
-       valuesSlice       [][]interface{}
-       isAligned         bool
+    timestamps        []int64
+    measurementsSlice [][]string
+    dataTypesSlice    [][]TSDataType
+    valuesSlice       [][]interface{}
+    isAligned         bool
 }
 
 func (d *deviceData) Len() int {
-       return len(d.timestamps)
+    return len(d.timestamps)
 }
 
 func (d *deviceData) Less(i, j int) bool {
-       return d.timestamps[i] < d.timestamps[j]
+    return d.timestamps[i] < d.timestamps[j]
 }
 
 func (d *deviceData) Swap(i, j int) {
-       d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i]
-       d.measurementsSlice[i], d.measurementsSlice[j] = 
d.measurementsSlice[j], d.measurementsSlice[i]
-       d.dataTypesSlice[i], d.dataTypesSlice[j] = d.dataTypesSlice[j], 
d.dataTypesSlice[i]
-       d.valuesSlice[i], d.valuesSlice[j] = d.valuesSlice[j], d.valuesSlice[i]
+    d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i]
+    d.measurementsSlice[i], d.measurementsSlice[j] = d.measurementsSlice[j], 
d.measurementsSlice[i]
+    d.dataTypesSlice[i], d.dataTypesSlice[j] = d.dataTypesSlice[j], 
d.dataTypesSlice[i]
+    d.valuesSlice[i], d.valuesSlice[j] = d.valuesSlice[j], d.valuesSlice[i]
 }
 
 // InsertRecordsOfOneDevice Insert multiple rows, which can reduce the 
overhead of network. This method is just like jdbc
@@ -617,88 +618,88 @@ func (d *deviceData) Swap(i, j int) {
 // your performance, please see insertTablet method
 // Each row is independent, which could have different deviceId, time, number 
of measurements
 func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps 
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, 
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
-       length := len(timestamps)
-       if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
-               return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
-       }
-
-       if !sorted {
-               sort.Sort(&deviceData{
-                       timestamps:        timestamps,
-                       measurementsSlice: measurementsSlice,
-                       dataTypesSlice:    dataTypesSlice,
-                       valuesSlice:       valuesSlice,
-               })
-       }
-
-       valuesList := make([][]byte, length)
-       for i := 0; i < length; i++ {
-               if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
-                       return nil, err
-               }
-       }
-
-       request := &rpc.TSInsertRecordsOfOneDeviceReq{
-               SessionId:        s.sessionId,
-               PrefixPath:       deviceId,
-               Timestamps:       timestamps,
-               MeasurementsList: measurementsSlice,
-               ValuesList:       valuesList,
-       }
-
-       r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
-
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = 
s.client.InsertRecordsOfOneDevice(context.Background(), request)
-               }
-       }
-
-       return r, err
+    length := len(timestamps)
+    if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
+        return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
+    }
+
+    if !sorted {
+        sort.Sort(&deviceData{
+            timestamps:        timestamps,
+            measurementsSlice: measurementsSlice,
+            dataTypesSlice:    dataTypesSlice,
+            valuesSlice:       valuesSlice,
+        })
+    }
+
+    valuesList := make([][]byte, length)
+    for i := 0; i < length; i++ {
+        if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
+            return nil, err
+        }
+    }
+
+    request := &rpc.TSInsertRecordsOfOneDeviceReq{
+        SessionId:        s.sessionId,
+        PrefixPath:       deviceId,
+        Timestamps:       timestamps,
+        MeasurementsList: measurementsSlice,
+        ValuesList:       valuesList,
+    }
+
+    r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
+        }
+    }
+
+    return r, err
 }
 
 func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps 
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, 
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
-       length := len(timestamps)
-       if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
-               return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
-       }
-
-       if !sorted {
-               sort.Sort(&deviceData{
-                       timestamps:        timestamps,
-                       measurementsSlice: measurementsSlice,
-                       dataTypesSlice:    dataTypesSlice,
-                       valuesSlice:       valuesSlice,
-               })
-       }
-
-       valuesList := make([][]byte, length)
-       for i := 0; i < length; i++ {
-               if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
-                       return nil, err
-               }
-       }
-       var isAligned = true
-       request := &rpc.TSInsertRecordsOfOneDeviceReq{
-               SessionId:        s.sessionId,
-               PrefixPath:       deviceId,
-               Timestamps:       timestamps,
-               MeasurementsList: measurementsSlice,
-               ValuesList:       valuesList,
-               IsAligned:        &isAligned,
-       }
-
-       r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
-
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = 
s.client.InsertRecordsOfOneDevice(context.Background(), request)
-               }
-       }
-
-       return r, err
+    length := len(timestamps)
+    if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
+        return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
+    }
+
+    if !sorted {
+        sort.Sort(&deviceData{
+            timestamps:        timestamps,
+            measurementsSlice: measurementsSlice,
+            dataTypesSlice:    dataTypesSlice,
+            valuesSlice:       valuesSlice,
+        })
+    }
+
+    valuesList := make([][]byte, length)
+    for i := 0; i < length; i++ {
+        if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
+            return nil, err
+        }
+    }
+    var isAligned = true
+    request := &rpc.TSInsertRecordsOfOneDeviceReq{
+        SessionId:        s.sessionId,
+        PrefixPath:       deviceId,
+        Timestamps:       timestamps,
+        MeasurementsList: measurementsSlice,
+        ValuesList:       valuesList,
+        IsAligned:        &isAligned,
+    }
+
+    r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
+        }
+    }
+
+    return r, err
 }
 
 /*
@@ -714,37 +715,37 @@ func (s *Session) 
InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
  *
  */
 func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, 
dataTypes [][]TSDataType, values [][]interface{},
-       timestamps []int64) (r *common.TSStatus, err error) {
-       request, err := s.genInsertRecordsReq(deviceIds, measurements, 
dataTypes, values, timestamps, false)
-       if err != nil {
-               return nil, err
-       } else {
-               r, err = s.client.InsertRecords(context.Background(), request)
-               if err != nil && r == nil {
-                       if s.reconnect() {
-                               request.SessionId = s.sessionId
-                               r, err = 
s.client.InsertRecords(context.Background(), request)
-                       }
-               }
-               return r, err
-       }
+    timestamps []int64) (r *common.TSStatus, err error) {
+    request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, 
values, timestamps, false)
+    if err != nil {
+        return nil, err
+    } else {
+        r, err = s.client.InsertRecords(context.Background(), request)
+        if err != nil && r == nil {
+            if s.reconnect() {
+                request.SessionId = s.sessionId
+                r, err = s.client.InsertRecords(context.Background(), request)
+            }
+        }
+        return r, err
+    }
 }
 
 func (s *Session) InsertAlignedRecords(deviceIds []string, measurements 
[][]string, dataTypes [][]TSDataType, values [][]interface{},
-       timestamps []int64) (r *common.TSStatus, err error) {
-       request, err := s.genInsertRecordsReq(deviceIds, measurements, 
dataTypes, values, timestamps, true)
-       if err != nil {
-               return nil, err
-       } else {
-               r, err = s.client.InsertRecords(context.Background(), request)
-               if err != nil && r == nil {
-                       if s.reconnect() {
-                               request.SessionId = s.sessionId
-                               r, err = 
s.client.InsertRecords(context.Background(), request)
-                       }
-               }
-               return r, err
-       }
+    timestamps []int64) (r *common.TSStatus, err error) {
+    request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, 
values, timestamps, true)
+    if err != nil {
+        return nil, err
+    } else {
+        r, err = s.client.InsertRecords(context.Background(), request)
+        if err != nil && r == nil {
+            if s.reconnect() {
+                request.SessionId = s.sessionId
+                r, err = s.client.InsertRecords(context.Background(), request)
+            }
+        }
+        return r, err
+    }
 }
 
 /*
@@ -753,450 +754,450 @@ func (s *Session) InsertAlignedRecords(deviceIds 
[]string, measurements [][]stri
  *tablets: []*client.Tablet, list of tablets
  */
 func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-       if !sorted {
-               for _, t := range tablets {
-                       if err := t.Sort(); err != nil {
-                               return nil, err
-                       }
-               }
-       }
-       request, err := s.genInsertTabletsReq(tablets, false)
-       if err != nil {
-               return nil, err
-       }
-       r, err = s.client.InsertTablets(context.Background(), request)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = s.client.InsertTablets(context.Background(), 
request)
-               }
-       }
-       return r, err
+    if !sorted {
+        for _, t := range tablets {
+            if err := t.Sort(); err != nil {
+                return nil, err
+            }
+        }
+    }
+    request, err := s.genInsertTabletsReq(tablets, false)
+    if err != nil {
+        return nil, err
+    }
+    r, err = s.client.InsertTablets(context.Background(), request)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertTablets(context.Background(), request)
+        }
+    }
+    return r, err
 }
 
 func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-       if !sorted {
-               for _, t := range tablets {
-                       if err := t.Sort(); err != nil {
-                               return nil, err
-                       }
-               }
-       }
-       request, err := s.genInsertTabletsReq(tablets, true)
-       if err != nil {
-               return nil, err
-       }
-       r, err = s.client.InsertTablets(context.Background(), request)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = s.client.InsertTablets(context.Background(), 
request)
-               }
-       }
-       return r, err
+    if !sorted {
+        for _, t := range tablets {
+            if err := t.Sort(); err != nil {
+                return nil, err
+            }
+        }
+    }
+    request, err := s.genInsertTabletsReq(tablets, true)
+    if err != nil {
+        return nil, err
+    }
+    r, err = s.client.InsertTablets(context.Background(), request)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertTablets(context.Background(), request)
+        }
+    }
+    return r, err
 }
 
 func (s *Session) ExecuteBatchStatement(inserts []string) (r *common.TSStatus, 
err error) {
-       request := rpc.TSExecuteBatchStatementReq{
-               SessionId:  s.sessionId,
-               Statements: inserts,
-       }
-       r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = 
s.client.ExecuteBatchStatement(context.Background(), &request)
-               }
-       }
-       return r, err
+    request := rpc.TSExecuteBatchStatementReq{
+        SessionId:  s.sessionId,
+        Statements: inserts,
+    }
+    r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.ExecuteBatchStatement(context.Background(), 
&request)
+        }
+    }
+    return r, err
 }
 
 func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime 
int64) (*SessionDataSet, error) {
-       request := rpc.TSRawDataQueryReq{
-               SessionId:   s.sessionId,
-               Paths:       paths,
-               FetchSize:   &s.config.FetchSize,
-               StartTime:   startTime,
-               EndTime:     endTime,
-               StatementId: s.requestStatementId,
-       }
-       resp, err := s.client.ExecuteRawDataQuery(context.Background(), 
&request)
-
-       if err != nil && resp == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       request.StatementId = s.requestStatementId
-                       resp, err = 
s.client.ExecuteRawDataQuery(context.Background(), &request)
-               }
-       }
-
-       return s.genDataSet("", resp), err
+    request := rpc.TSRawDataQueryReq{
+        SessionId:   s.sessionId,
+        Paths:       paths,
+        FetchSize:   &s.config.FetchSize,
+        StartTime:   startTime,
+        EndTime:     endTime,
+        StatementId: s.requestStatementId,
+    }
+    resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request)
+
+    if err != nil && resp == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            request.StatementId = s.requestStatementId
+            resp, err = s.client.ExecuteRawDataQuery(context.Background(), 
&request)
+        }
+    }
+
+    return s.genDataSet("", resp), err
 }
 
 func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
-       request := rpc.TSExecuteStatementReq{
-               SessionId:   s.sessionId,
-               Statement:   sql,
-               StatementId: s.requestStatementId,
-               FetchSize:   &s.config.FetchSize,
-       }
-       resp, err := s.client.ExecuteUpdateStatement(context.Background(), 
&request)
-
-       if err != nil && resp == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       request.StatementId = s.requestStatementId
-                       resp, err = 
s.client.ExecuteUpdateStatement(context.Background(), &request)
-               }
-       }
-
-       return s.genDataSet(sql, resp), err
+    request := rpc.TSExecuteStatementReq{
+        SessionId:   s.sessionId,
+        Statement:   sql,
+        StatementId: s.requestStatementId,
+        FetchSize:   &s.config.FetchSize,
+    }
+    resp, err := s.client.ExecuteUpdateStatement(context.Background(), 
&request)
+
+    if err != nil && resp == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            request.StatementId = s.requestStatementId
+            resp, err = s.client.ExecuteUpdateStatement(context.Background(), 
&request)
+        }
+    }
+
+    return s.genDataSet(sql, resp), err
 }
 
 func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) 
*SessionDataSet {
-       var queryId int64
-       if resp.QueryId == nil {
-               queryId = 0
-       } else {
-               queryId = *resp.QueryId
-       }
-       return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap,
-               queryId, s.client, s.sessionId, resp.QueryDataSet, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
+    var queryId int64
+    if resp.QueryId == nil {
+        queryId = 0
+    } else {
+        queryId = *resp.QueryId
+    }
+    return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap,
+        queryId, s.client, s.sessionId, resp.QueryDataSet, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
 }
 
 func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) 
(*rpc.TSInsertTabletsReq, error) {
-       var (
-               length           = len(tablets)
-               deviceIds        = make([]string, length)
-               measurementsList = make([][]string, length)
-               valuesList       = make([][]byte, length)
-               timestampsList   = make([][]byte, length)
-               typesList        = make([][]int32, length)
-               sizeList         = make([]int32, length)
-       )
-       for index, tablet := range tablets {
-               deviceIds[index] = tablet.deviceId
-               measurementsList[index] = tablet.GetMeasurements()
-
-               values, err := tablet.getValuesBytes()
-               if err != nil {
-                       return nil, err
-               }
-
-               valuesList[index] = values
-               timestampsList[index] = tablet.GetTimestampBytes()
-               typesList[index] = tablet.getDataTypes()
-               sizeList[index] = int32(tablet.RowSize)
-       }
-       request := rpc.TSInsertTabletsReq{
-               SessionId:        s.sessionId,
-               PrefixPaths:      deviceIds,
-               TypesList:        typesList,
-               MeasurementsList: measurementsList,
-               ValuesList:       valuesList,
-               TimestampsList:   timestampsList,
-               SizeList:         sizeList,
-               IsAligned:        &isAligned,
-       }
-       return &request, nil
+    var (
+        length           = len(tablets)
+        deviceIds        = make([]string, length)
+        measurementsList = make([][]string, length)
+        valuesList       = make([][]byte, length)
+        timestampsList   = make([][]byte, length)
+        typesList        = make([][]int32, length)
+        sizeList         = make([]int32, length)
+    )
+    for index, tablet := range tablets {
+        deviceIds[index] = tablet.deviceId
+        measurementsList[index] = tablet.GetMeasurements()
+
+        values, err := tablet.getValuesBytes()
+        if err != nil {
+            return nil, err
+        }
+
+        valuesList[index] = values
+        timestampsList[index] = tablet.GetTimestampBytes()
+        typesList[index] = tablet.getDataTypes()
+        sizeList[index] = int32(tablet.RowSize)
+    }
+    request := rpc.TSInsertTabletsReq{
+        SessionId:        s.sessionId,
+        PrefixPaths:      deviceIds,
+        TypesList:        typesList,
+        MeasurementsList: measurementsList,
+        ValuesList:       valuesList,
+        TimestampsList:   timestampsList,
+        SizeList:         sizeList,
+        IsAligned:        &isAligned,
+    }
+    return &request, nil
 }
 
 func (s *Session) genInsertRecordsReq(deviceIds []string, measurements 
[][]string, dataTypes [][]TSDataType, values [][]interface{},
-       timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
-       length := len(deviceIds)
-       if length != len(timestamps) || length != len(measurements) || length 
!= len(values) {
-               return nil, errLength
-       }
-       request := rpc.TSInsertRecordsReq{
-               SessionId:        s.sessionId,
-               PrefixPaths:      deviceIds,
-               MeasurementsList: measurements,
-               Timestamps:       timestamps,
-               IsAligned:        &isAligned,
-       }
-       v := make([][]byte, length)
-       for i := 0; i < len(measurements); i++ {
-               if bys, err := valuesToBytes(dataTypes[i], values[i]); err == 
nil {
-                       v[i] = bys
-               } else {
-                       return nil, err
-               }
-       }
-       request.ValuesList = v
-       return &request, nil
+    timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
+    length := len(deviceIds)
+    if length != len(timestamps) || length != len(measurements) || length != 
len(values) {
+        return nil, errLength
+    }
+    request := rpc.TSInsertRecordsReq{
+        SessionId:        s.sessionId,
+        PrefixPaths:      deviceIds,
+        MeasurementsList: measurements,
+        Timestamps:       timestamps,
+        IsAligned:        &isAligned,
+    }
+    v := make([][]byte, length)
+    for i := 0; i < len(measurements); i++ {
+        if bys, err := valuesToBytes(dataTypes[i], values[i]); err == nil {
+            v[i] = bys
+        } else {
+            return nil, err
+        }
+    }
+    request.ValuesList = v
+    return &request, nil
 }
 
 func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, 
error) {
-       buff := &bytes.Buffer{}
-       for i, t := range dataTypes {
-               binary.Write(buff, binary.BigEndian, byte(t))
-               v := values[i]
-               if v == nil {
-                       return nil, fmt.Errorf("values[%d] can't be nil", i)
-               }
-
-               switch t {
-               case BOOLEAN:
-                       switch v.(type) {
-                       case bool:
-                               binary.Write(buff, binary.BigEndian, v)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be bool", i, v, reflect.TypeOf(v))
-                       }
-               case INT32:
-                       switch v.(type) {
-                       case int32:
-                               binary.Write(buff, binary.BigEndian, v)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be int32", i, v, reflect.TypeOf(v))
-                       }
-               case INT64, TIMESTAMP:
-                       switch v.(type) {
-                       case int64:
-                               binary.Write(buff, binary.BigEndian, v)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be int64", i, v, reflect.TypeOf(v))
-                       }
-               case FLOAT:
-                       switch v.(type) {
-                       case float32:
-                               binary.Write(buff, binary.BigEndian, v)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be float32", i, v, reflect.TypeOf(v))
-                       }
-               case DOUBLE:
-                       switch v.(type) {
-                       case float64:
-                               binary.Write(buff, binary.BigEndian, v)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be float64", i, v, reflect.TypeOf(v))
-                       }
-               case TEXT, STRING:
-                       switch s := v.(type) {
-                       case string:
-                               size := len(s)
-                               binary.Write(buff, binary.BigEndian, 
int32(size))
-                               binary.Write(buff, binary.BigEndian, []byte(s))
-                       case []byte:
-                               size := len(s)
-                               binary.Write(buff, binary.BigEndian, 
int32(size))
-                               binary.Write(buff, binary.BigEndian, s)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be string or []byte", i, v, reflect.TypeOf(v))
-                       }
-               case BLOB:
-                       switch s := v.(type) {
-                       case []byte:
-                               size := len(s)
-                               binary.Write(buff, binary.BigEndian, 
int32(size))
-                               binary.Write(buff, binary.BigEndian, s)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be []byte", i, v, reflect.TypeOf(v))
-                       }
-               case DATE:
-                       switch s := v.(type) {
-                       case time.Time:
-                               date, err := dateToInt32(s)
-                               if err != nil {
-                                       return nil, err
-                               }
-                               binary.Write(buff, binary.BigEndian, date)
-                       default:
-                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be time.Time", i, v, reflect.TypeOf(v))
-                       }
-               default:
-                       return nil, fmt.Errorf("types[%d] is incorrect, it must 
in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, 
STRING)", i)
-               }
-       }
-       return buff.Bytes(), nil
+    buff := &bytes.Buffer{}
+    for i, t := range dataTypes {
+        binary.Write(buff, binary.BigEndian, byte(t))
+        v := values[i]
+        if v == nil {
+            return nil, fmt.Errorf("values[%d] can't be nil", i)
+        }
+
+        switch t {
+        case BOOLEAN:
+            switch v.(type) {
+            case bool:
+                binary.Write(buff, binary.BigEndian, v)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, 
reflect.TypeOf(v))
+            }
+        case INT32:
+            switch v.(type) {
+            case int32:
+                binary.Write(buff, binary.BigEndian, v)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, 
v, reflect.TypeOf(v))
+            }
+        case INT64, TIMESTAMP:
+            switch v.(type) {
+            case int64:
+                binary.Write(buff, binary.BigEndian, v)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, 
v, reflect.TypeOf(v))
+            }
+        case FLOAT:
+            switch v.(type) {
+            case float32:
+                binary.Write(buff, binary.BigEndian, v)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, 
v, reflect.TypeOf(v))
+            }
+        case DOUBLE:
+            switch v.(type) {
+            case float64:
+                binary.Write(buff, binary.BigEndian, v)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, 
v, reflect.TypeOf(v))
+            }
+        case TEXT, STRING:
+            switch s := v.(type) {
+            case string:
+                size := len(s)
+                binary.Write(buff, binary.BigEndian, int32(size))
+                binary.Write(buff, binary.BigEndian, []byte(s))
+            case []byte:
+                size := len(s)
+                binary.Write(buff, binary.BigEndian, int32(size))
+                binary.Write(buff, binary.BigEndian, s)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be string or 
[]byte", i, v, reflect.TypeOf(v))
+            }
+        case BLOB:
+            switch s := v.(type) {
+            case []byte:
+                size := len(s)
+                binary.Write(buff, binary.BigEndian, int32(size))
+                binary.Write(buff, binary.BigEndian, s)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be []byte", i, 
v, reflect.TypeOf(v))
+            }
+        case DATE:
+            switch s := v.(type) {
+            case time.Time:
+                date, err := dateToInt32(s)
+                if err != nil {
+                    return nil, err
+                }
+                binary.Write(buff, binary.BigEndian, date)
+            default:
+                return nil, fmt.Errorf("values[%d] %v(%v) must be time.Time", 
i, v, reflect.TypeOf(v))
+            }
+        default:
+            return nil, fmt.Errorf("types[%d] is incorrect, it must in 
(BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, STRING)", i)
+        }
+    }
+    return buff.Bytes(), nil
 }
 
 func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-       if !sorted {
-               if err := tablet.Sort(); err != nil {
-                       return nil, err
-               }
-       }
-       request, err := s.genTSInsertTabletReq(tablet, false)
-       if err != nil {
-               return nil, err
-       }
-
-       r, err = s.client.InsertTablet(context.Background(), request)
-
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = s.client.InsertTablet(context.Background(), 
request)
-               }
-       }
-
-       return r, err
+    if !sorted {
+        if err := tablet.Sort(); err != nil {
+            return nil, err
+        }
+    }
+    request, err := s.genTSInsertTabletReq(tablet, false)
+    if err != nil {
+        return nil, err
+    }
+
+    r, err = s.client.InsertTablet(context.Background(), request)
+
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertTablet(context.Background(), request)
+        }
+    }
+
+    return r, err
 }
 
 func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-       if !sorted {
-               if err := tablet.Sort(); err != nil {
-                       return nil, err
-               }
-       }
-       request, err := s.genTSInsertTabletReq(tablet, true)
-       if err != nil {
-               return nil, err
-       }
-
-       r, err = s.client.InsertTablet(context.Background(), request)
-
-       if err != nil && r == nil {
-               if s.reconnect() {
-                       request.SessionId = s.sessionId
-                       r, err = s.client.InsertTablet(context.Background(), 
request)
-               }
-       }
-
-       return r, err
+    if !sorted {
+        if err := tablet.Sort(); err != nil {
+            return nil, err
+        }
+    }
+    request, err := s.genTSInsertTabletReq(tablet, true)
+    if err != nil {
+        return nil, err
+    }
+
+    r, err = s.client.InsertTablet(context.Background(), request)
+
+    if err != nil && r == nil {
+        if s.reconnect() {
+            request.SessionId = s.sessionId
+            r, err = s.client.InsertTablet(context.Background(), request)
+        }
+    }
+
+    return r, err
 }
 
 func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool) 
(*rpc.TSInsertTabletReq, error) {
-       if values, err := tablet.getValuesBytes(); err == nil {
-               request := &rpc.TSInsertTabletReq{
-                       SessionId:    s.sessionId,
-                       PrefixPath:   tablet.deviceId,
-                       Measurements: tablet.GetMeasurements(),
-                       Values:       values,
-                       Timestamps:   tablet.GetTimestampBytes(),
-                       Types:        tablet.getDataTypes(),
-                       Size:         int32(tablet.RowSize),
-                       IsAligned:    &isAligned,
-               }
-               return request, nil
-       } else {
-               return nil, err
-       }
+    if values, err := tablet.getValuesBytes(); err == nil {
+        request := &rpc.TSInsertTabletReq{
+            SessionId:    s.sessionId,
+            PrefixPath:   tablet.deviceId,
+            Measurements: tablet.GetMeasurements(),
+            Values:       values,
+            Timestamps:   tablet.GetTimestampBytes(),
+            Types:        tablet.getDataTypes(),
+            Size:         int32(tablet.RowSize),
+            IsAligned:    &isAligned,
+        }
+        return request, nil
+    } else {
+        return nil, err
+    }
 }
 
 func (s *Session) GetSessionId() int64 {
-       return s.sessionId
+    return s.sessionId
 }
 
 func NewSession(config *Config) Session {
-       endPoint := endPoint{}
-       endPoint.Host = config.Host
-       endPoint.Port = config.Port
-       endPointList.PushBack(endPoint)
-       return Session{config: config}
+    endPoint := endPoint{}
+    endPoint.Host = config.Host
+    endPoint.Port = config.Port
+    endPointList.PushBack(endPoint)
+    return Session{config: config}
 }
 
 func NewClusterSession(clusterConfig *ClusterConfig) Session {
-       session := Session{}
-       node := endPoint{}
-       for i := 0; i < len(clusterConfig.NodeUrls); i++ {
-               node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
-               node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
-               endPointList.PushBack(node)
-       }
-       var err error
-       for e := endPointList.Front(); e != nil; e = e.Next() {
-               session.trans = 
thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port), &thrift.TConfiguration{
-                       ConnectTimeout: time.Duration(0), // Use 0 for no 
timeout
-               })
-               // session.trans = thrift.NewTFramedTransport(session.trans)    
// deprecated
-               var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
-               session.trans = thrift.NewTFramedTransportConf(session.trans, 
&tmp_conf)
-               if !session.trans.IsOpen() {
-                       err = session.trans.Open()
-                       if err != nil {
-                               log.Println(err)
-                       } else {
-                               session.config = 
getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
-                                       clusterConfig.UserName, 
clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, 
clusterConfig.ConnectRetryMax)
-                               break
-                       }
-               }
-       }
-       if !session.trans.IsOpen() {
-               log.Fatal("No Server Can Connect")
-       }
-       return session
+    session := Session{}
+    node := endPoint{}
+    for i := 0; i < len(clusterConfig.NodeUrls); i++ {
+        node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
+        node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
+        endPointList.PushBack(node)
+    }
+    var err error
+    for e := endPointList.Front(); e != nil; e = e.Next() {
+        session.trans = 
thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port), &thrift.TConfiguration{
+            ConnectTimeout: time.Duration(0), // Use 0 for no timeout
+        })
+        // session.trans = thrift.NewTFramedTransport(session.trans)   // 
deprecated
+        var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
+        session.trans = thrift.NewTFramedTransportConf(session.trans, 
&tmp_conf)
+        if !session.trans.IsOpen() {
+            err = session.trans.Open()
+            if err != nil {
+                log.Println(err)
+            } else {
+                session.config = getConfig(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port,
+                    clusterConfig.UserName, clusterConfig.Password, 
clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
+                break
+            }
+        }
+    }
+    if !session.trans.IsOpen() {
+        log.Fatal("No Server Can Connect")
+    }
+    return session
 }
 
 func (s *Session) initClusterConn(node endPoint) error {
-       var err error
-
-       s.trans = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), 
&thrift.TConfiguration{
-               ConnectTimeout: time.Duration(0), // Use 0 for no timeout
-       })
-       if err == nil {
-               // s.trans = thrift.NewTFramedTransport(s.trans)        // 
deprecated
-               var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
-               s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
-               if !s.trans.IsOpen() {
-                       err = s.trans.Open()
-                       if err != nil {
-                               return err
-                       }
-               }
-       }
-
-       if s.config.FetchSize < 1 {
-               s.config.FetchSize = DefaultFetchSize
-       }
-
-       if s.config.TimeZone == "" {
-               s.config.TimeZone = DefaultTimeZone
-       }
-
-       if s.config.ConnectRetryMax < 1 {
-               s.config.ConnectRetryMax = DefaultConnectRetryMax
-       }
-
-       var protocolFactory thrift.TProtocolFactory
-       protocolFactory = getProtocolFactory(s.config.enableRPCCompression)
-       iprot := protocolFactory.GetProtocol(s.trans)
-       oprot := protocolFactory.GetProtocol(s.trans)
-       s.client = 
rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
-       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
-               Password: &s.config.Password}
-
-       resp, err := s.client.OpenSession(context.Background(), &req)
-       if err != nil {
-               return err
-       }
-       s.sessionId = resp.GetSessionId()
-       s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
-       return err
+    var err error
+
+    s.trans = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), 
&thrift.TConfiguration{
+        ConnectTimeout: time.Duration(0), // Use 0 for no timeout
+    })
+    if err == nil {
+        // s.trans = thrift.NewTFramedTransport(s.trans)       // deprecated
+        var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
+        s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
+        if !s.trans.IsOpen() {
+            err = s.trans.Open()
+            if err != nil {
+                return err
+            }
+        }
+    }
+
+    if s.config.FetchSize < 1 {
+        s.config.FetchSize = DefaultFetchSize
+    }
+
+    if s.config.TimeZone == "" {
+        s.config.TimeZone = DefaultTimeZone
+    }
+
+    if s.config.ConnectRetryMax < 1 {
+        s.config.ConnectRetryMax = DefaultConnectRetryMax
+    }
+
+    var protocolFactory thrift.TProtocolFactory
+    protocolFactory = getProtocolFactory(s.config.enableRPCCompression)
+    iprot := protocolFactory.GetProtocol(s.trans)
+    oprot := protocolFactory.GetProtocol(s.trans)
+    s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, 
oprot))
+    req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
+        Password: &s.config.Password}
+
+    resp, err := s.client.OpenSession(context.Background(), &req)
+    if err != nil {
+        return err
+    }
+    s.sessionId = resp.GetSessionId()
+    s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
+    return err
 
 }
 
 func getConfig(host string, port string, userName string, passWord string, 
fetchSize int32, timeZone string, connectRetryMax int) *Config {
-       return &Config{
-               Host:            host,
-               Port:            port,
-               UserName:        userName,
-               Password:        passWord,
-               FetchSize:       fetchSize,
-               TimeZone:        timeZone,
-               ConnectRetryMax: connectRetryMax,
-       }
+    return &Config{
+        Host:            host,
+        Port:            port,
+        UserName:        userName,
+        Password:        passWord,
+        FetchSize:       fetchSize,
+        TimeZone:        timeZone,
+        ConnectRetryMax: connectRetryMax,
+    }
 }
 
 func (s *Session) reconnect() bool {
-       var err error
-       var connectedSuccess = false
-
-       for i := 0; i < s.config.ConnectRetryMax; i++ {
-               for e := endPointList.Front(); e != nil; e = e.Next() {
-                       err = s.initClusterConn(e.Value.(endPoint))
-                       if err == nil {
-                               connectedSuccess = true
-                               break
-                       } else {
-                               log.Println("Connection refused:", 
e.Value.(endPoint))
-                       }
-               }
-               if connectedSuccess {
-                       break
-               }
-       }
-       return connectedSuccess
+    var err error
+    var connectedSuccess = false
+
+    for i := 0; i < s.config.ConnectRetryMax; i++ {
+        for e := endPointList.Front(); e != nil; e = e.Next() {
+            err = s.initClusterConn(e.Value.(endPoint))
+            if err == nil {
+                connectedSuccess = true
+                break
+            } else {
+                log.Println("Connection refused:", e.Value.(endPoint))
+            }
+        }
+        if connectedSuccess {
+            break
+        }
+    }
+    return connectedSuccess
 }

Reply via email to