This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch opt_go_client
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git


The following commit(s) were added to refs/heads/opt_go_client by this push:
     new d275506  format
d275506 is described below

commit d27550632615375bfb33f9dcc7e4fd71d51dd94b
Author: HTHou <[email protected]>
AuthorDate: Mon Nov 18 21:05:52 2024 +0800

    format
---
 client/session.go | 1818 ++++++++++++++++++++++++++---------------------------
 1 file changed, 909 insertions(+), 909 deletions(-)

diff --git a/client/session.go b/client/session.go
index efe0d19..26ebff0 100644
--- a/client/session.go
+++ b/client/session.go
@@ -20,166 +20,166 @@
 package client
 
 import (
-    "bytes"
-    "container/list"
-    "context"
-    "encoding/binary"
-    "errors"
-    "fmt"
-    "github.com/apache/iotdb-client-go/common"
-    "log"
-    "net"
-    "reflect"
-    "sort"
-    "strings"
-    "time"
-
-    "github.com/apache/iotdb-client-go/rpc"
-    "github.com/apache/thrift/lib/go/thrift"
+       "bytes"
+       "container/list"
+       "context"
+       "encoding/binary"
+       "errors"
+       "fmt"
+       "github.com/apache/iotdb-client-go/common"
+       "log"
+       "net"
+       "reflect"
+       "sort"
+       "strings"
+       "time"
+
+       "github.com/apache/iotdb-client-go/rpc"
+       "github.com/apache/thrift/lib/go/thrift"
 )
 
 const (
-    DefaultTimeZone        = "Asia/Shanghai"
-    DefaultFetchSize       = 1024
-    DefaultConnectRetryMax = 3
+       DefaultTimeZone        = "Asia/Shanghai"
+       DefaultFetchSize       = 1024
+       DefaultConnectRetryMax = 3
 )
 
 var errLength = errors.New("deviceIds, times, measurementsList and 
valuesList's size should be equal")
 
 type Config struct {
-    Host                 string
-    Port                 string
-    UserName             string
-    Password             string
-    FetchSize            int32
-    TimeZone             string
-    ConnectRetryMax      int
-    enableRPCCompression bool
+       Host                 string
+       Port                 string
+       UserName             string
+       Password             string
+       FetchSize            int32
+       TimeZone             string
+       ConnectRetryMax      int
+       enableRPCCompression bool
 }
 
 type Session struct {
-    config             *Config
-    client             *rpc.IClientRPCServiceClient
-    sessionId          int64
-    trans              thrift.TTransport
-    requestStatementId int64
+       config             *Config
+       client             *rpc.IClientRPCServiceClient
+       sessionId          int64
+       trans              thrift.TTransport
+       requestStatementId int64
 }
 
 type endPoint struct {
-    Host string
-    Port string
+       Host string
+       Port string
 }
 
 var endPointList = list.New()
 
 func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) 
error {
-    if s.config.FetchSize <= 0 {
-        s.config.FetchSize = DefaultFetchSize
-    }
-    if s.config.TimeZone == "" {
-        s.config.TimeZone = DefaultTimeZone
-    }
-
-    if s.config.ConnectRetryMax <= 0 {
-        s.config.ConnectRetryMax = DefaultConnectRetryMax
-    }
-
-    var protocolFactory thrift.TProtocolFactory
-    var err error
-
-    // in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it 
returns one.
-    s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, 
s.config.Port), &thrift.TConfiguration{
-        ConnectTimeout: time.Duration(connectionTimeoutInMs) * 
time.Millisecond, // Use 0 for no timeout
-    })
-    // s.trans = thrift.NewTFramedTransport(s.trans)   // deprecated
-    var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
-    s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
-    if !s.trans.IsOpen() {
-        err = s.trans.Open()
-        if err != nil {
-            return err
-        }
-    }
-    s.config.enableRPCCompression = enableRPCCompression
-    protocolFactory = getProtocolFactory(enableRPCCompression)
-    iprot := protocolFactory.GetProtocol(s.trans)
-    oprot := protocolFactory.GetProtocol(s.trans)
-    s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, 
oprot))
-    req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
-        Password: &s.config.Password}
-    resp, err := s.client.OpenSession(context.Background(), &req)
-    if err != nil {
-        return err
-    }
-    s.sessionId = resp.GetSessionId()
-    s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
-    return err
+       if s.config.FetchSize <= 0 {
+               s.config.FetchSize = DefaultFetchSize
+       }
+       if s.config.TimeZone == "" {
+               s.config.TimeZone = DefaultTimeZone
+       }
+
+       if s.config.ConnectRetryMax <= 0 {
+               s.config.ConnectRetryMax = DefaultConnectRetryMax
+       }
+
+       var protocolFactory thrift.TProtocolFactory
+       var err error
+
+       // in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it 
returns one.
+       s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, 
s.config.Port), &thrift.TConfiguration{
+               ConnectTimeout: time.Duration(connectionTimeoutInMs) * 
time.Millisecond, // Use 0 for no timeout
+       })
+       // s.trans = thrift.NewTFramedTransport(s.trans)        // deprecated
+       var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
+       s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
+       if !s.trans.IsOpen() {
+               err = s.trans.Open()
+               if err != nil {
+                       return err
+               }
+       }
+       s.config.enableRPCCompression = enableRPCCompression
+       protocolFactory = getProtocolFactory(enableRPCCompression)
+       iprot := protocolFactory.GetProtocol(s.trans)
+       oprot := protocolFactory.GetProtocol(s.trans)
+       s.client = 
rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
+       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
+               Password: &s.config.Password}
+       resp, err := s.client.OpenSession(context.Background(), &req)
+       if err != nil {
+               return err
+       }
+       s.sessionId = resp.GetSessionId()
+       s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
+       return err
 }
 
 type ClusterConfig struct {
-    NodeUrls        []string //ip:port
-    UserName        string
-    Password        string
-    FetchSize       int32
-    TimeZone        string
-    ConnectRetryMax int
+       NodeUrls        []string //ip:port
+       UserName        string
+       Password        string
+       FetchSize       int32
+       TimeZone        string
+       ConnectRetryMax int
 }
 
 type ClusterSession struct {
-    config             *ClusterConfig
-    client             *rpc.IClientRPCServiceClient
-    sessionId          int64
-    trans              thrift.TTransport
-    requestStatementId int64
+       config             *ClusterConfig
+       client             *rpc.IClientRPCServiceClient
+       sessionId          int64
+       trans              thrift.TTransport
+       requestStatementId int64
 }
 
 func (s *Session) OpenCluster(enableRPCCompression bool) error {
-    if s.config.FetchSize <= 0 {
-        s.config.FetchSize = DefaultFetchSize
-    }
-    if s.config.TimeZone == "" {
-        s.config.TimeZone = DefaultTimeZone
-    }
-
-    if s.config.ConnectRetryMax <= 0 {
-        s.config.ConnectRetryMax = DefaultConnectRetryMax
-    }
-
-    var protocolFactory thrift.TProtocolFactory
-    var err error
-
-    s.config.enableRPCCompression = enableRPCCompression
-    protocolFactory = getProtocolFactory(enableRPCCompression)
-    iprot := protocolFactory.GetProtocol(s.trans)
-    oprot := protocolFactory.GetProtocol(s.trans)
-    s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, 
oprot))
-    req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
-        Password: &s.config.Password}
-
-    resp, err := s.client.OpenSession(context.Background(), &req)
-    if err != nil {
-        return err
-    }
-    s.sessionId = resp.GetSessionId()
-    s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
-    return err
+       if s.config.FetchSize <= 0 {
+               s.config.FetchSize = DefaultFetchSize
+       }
+       if s.config.TimeZone == "" {
+               s.config.TimeZone = DefaultTimeZone
+       }
+
+       if s.config.ConnectRetryMax <= 0 {
+               s.config.ConnectRetryMax = DefaultConnectRetryMax
+       }
+
+       var protocolFactory thrift.TProtocolFactory
+       var err error
+
+       s.config.enableRPCCompression = enableRPCCompression
+       protocolFactory = getProtocolFactory(enableRPCCompression)
+       iprot := protocolFactory.GetProtocol(s.trans)
+       oprot := protocolFactory.GetProtocol(s.trans)
+       s.client = 
rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
+       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
+               Password: &s.config.Password}
+
+       resp, err := s.client.OpenSession(context.Background(), &req)
+       if err != nil {
+               return err
+       }
+       s.sessionId = resp.GetSessionId()
+       s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
+       return err
 }
 
 func getProtocolFactory(enableRPCCompression bool) thrift.TProtocolFactory {
-    if enableRPCCompression {
-        return thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
-    }
-    return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{})
+       if enableRPCCompression {
+               return 
thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
+       }
+       return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{})
 }
 
 func (s *Session) Close() (r *common.TSStatus, err error) {
-    req := rpc.NewTSCloseSessionReq()
-    req.SessionId = s.sessionId
-    _, err = s.client.CloseSession(context.Background(), req)
-    if err != nil {
-        return nil, err
-    }
-    return nil, s.trans.Close()
+       req := rpc.NewTSCloseSessionReq()
+       req.SessionId = s.sessionId
+       _, err = s.client.CloseSession(context.Background(), req)
+       if err != nil {
+               return nil, err
+       }
+       return nil, s.trans.Close()
 }
 
 /*
@@ -190,13 +190,13 @@ func (s *Session) Close() (r *common.TSStatus, err error) 
{
  *error: correctness of operation
  */
 func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus, 
err error) {
-    r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, 
storageGroupId)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            r, err = s.client.SetStorageGroup(context.Background(), 
s.sessionId, storageGroupId)
-        }
-    }
-    return r, err
+       r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, 
storageGroupId)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       r, err = s.client.SetStorageGroup(context.Background(), 
s.sessionId, storageGroupId)
+               }
+       }
+       return r, err
 }
 
 /*
@@ -207,13 +207,13 @@ func (s *Session) SetStorageGroup(storageGroupId string) 
(r *common.TSStatus, er
  *error: correctness of operation
  */
 func (s *Session) DeleteStorageGroup(storageGroupId string) (r 
*common.TSStatus, err error) {
-    r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, 
[]string{storageGroupId})
-    if err != nil && r == nil {
-        if s.reconnect() {
-            r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, []string{storageGroupId})
-        }
-    }
-    return r, err
+       r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, []string{storageGroupId})
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       r, err = 
s.client.DeleteStorageGroups(context.Background(), s.sessionId, 
[]string{storageGroupId})
+               }
+       }
+       return r, err
 }
 
 /*
@@ -224,13 +224,13 @@ func (s *Session) DeleteStorageGroup(storageGroupId 
string) (r *common.TSStatus,
  *error: correctness of operation
  */
 func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r 
*common.TSStatus, err error) {
-    r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, 
storageGroupIds)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, storageGroupIds)
-        }
-    }
-    return r, err
+       r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, storageGroupIds)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       r, err = 
s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
+               }
+       }
+       return r, err
 }
 
 /*
@@ -244,16 +244,16 @@ func (s *Session) DeleteStorageGroups(storageGroupIds 
...string) (r *common.TSSt
  *error: correctness of operation
  */
 func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding 
TSEncoding, compressor TSCompressionType, attributes map[string]string, tags 
map[string]string) (r *common.TSStatus, err error) {
-    request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, 
DataType: int32(dataType), Encoding: int32(encoding),
-        Compressor: int32(compressor), Attributes: attributes, Tags: tags}
-    status, err := s.client.CreateTimeseries(context.Background(), &request)
-    if err != nil && status == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            status, err = s.client.CreateTimeseries(context.Background(), 
&request)
-        }
-    }
-    return status, err
+       request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: 
path, DataType: int32(dataType), Encoding: int32(encoding),
+               Compressor: int32(compressor), Attributes: attributes, Tags: 
tags}
+       status, err := s.client.CreateTimeseries(context.Background(), &request)
+       if err != nil && status == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       status, err = 
s.client.CreateTimeseries(context.Background(), &request)
+               }
+       }
+       return status, err
 }
 
 /*
@@ -269,38 +269,38 @@ func (s *Session) CreateTimeseries(path string, dataType 
TSDataType, encoding TS
  *error: correctness of operation
  */
 func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements 
[]string, dataTypes []TSDataType, encodings []TSEncoding, compressors 
[]TSCompressionType, measurementAlias []string) (r *common.TSStatus, err error) 
{
-    destTypes := make([]int32, len(dataTypes))
-    for i, t := range dataTypes {
-        destTypes[i] = int32(t)
-    }
-
-    destEncodings := make([]int32, len(encodings))
-    for i, e := range encodings {
-        destEncodings[i] = int32(e)
-    }
-
-    destCompressions := make([]int32, len(compressors))
-    for i, e := range compressors {
-        destCompressions[i] = int32(e)
-    }
-
-    request := rpc.TSCreateAlignedTimeseriesReq{
-        SessionId:        s.sessionId,
-        PrefixPath:       prefixPath,
-        Measurements:     measurements,
-        DataTypes:        destTypes,
-        Encodings:        destEncodings,
-        Compressors:      destCompressions,
-        MeasurementAlias: measurementAlias,
-    }
-    status, err := s.client.CreateAlignedTimeseries(context.Background(), 
&request)
-    if err != nil && status == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            status, err = 
s.client.CreateAlignedTimeseries(context.Background(), &request)
-        }
-    }
-    return status, err
+       destTypes := make([]int32, len(dataTypes))
+       for i, t := range dataTypes {
+               destTypes[i] = int32(t)
+       }
+
+       destEncodings := make([]int32, len(encodings))
+       for i, e := range encodings {
+               destEncodings[i] = int32(e)
+       }
+
+       destCompressions := make([]int32, len(compressors))
+       for i, e := range compressors {
+               destCompressions[i] = int32(e)
+       }
+
+       request := rpc.TSCreateAlignedTimeseriesReq{
+               SessionId:        s.sessionId,
+               PrefixPath:       prefixPath,
+               Measurements:     measurements,
+               DataTypes:        destTypes,
+               Encodings:        destEncodings,
+               Compressors:      destCompressions,
+               MeasurementAlias: measurementAlias,
+       }
+       status, err := s.client.CreateAlignedTimeseries(context.Background(), 
&request)
+       if err != nil && status == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       status, err = 
s.client.CreateAlignedTimeseries(context.Background(), &request)
+               }
+       }
+       return status, err
 }
 
 /*
@@ -314,33 +314,33 @@ func (s *Session) CreateAlignedTimeseries(prefixPath 
string, measurements []stri
  *error: correctness of operation
  */
 func (s *Session) CreateMultiTimeseries(paths []string, dataTypes 
[]TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r 
*common.TSStatus, err error) {
-    destTypes := make([]int32, len(dataTypes))
-    for i, t := range dataTypes {
-        destTypes[i] = int32(t)
-    }
-
-    destEncodings := make([]int32, len(encodings))
-    for i, e := range encodings {
-        destEncodings[i] = int32(e)
-    }
-
-    destCompressions := make([]int32, len(compressors))
-    for i, e := range compressors {
-        destCompressions[i] = int32(e)
-    }
-
-    request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, Paths: 
paths, DataTypes: destTypes,
-        Encodings: destEncodings, Compressors: destCompressions}
-    r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
-
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.CreateMultiTimeseries(context.Background(), 
&request)
-        }
-    }
-
-    return r, err
+       destTypes := make([]int32, len(dataTypes))
+       for i, t := range dataTypes {
+               destTypes[i] = int32(t)
+       }
+
+       destEncodings := make([]int32, len(encodings))
+       for i, e := range encodings {
+               destEncodings[i] = int32(e)
+       }
+
+       destCompressions := make([]int32, len(compressors))
+       for i, e := range compressors {
+               destCompressions[i] = int32(e)
+       }
+
+       request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, 
Paths: paths, DataTypes: destTypes,
+               Encodings: destEncodings, Compressors: destCompressions}
+       r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
+
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = 
s.client.CreateMultiTimeseries(context.Background(), &request)
+               }
+       }
+
+       return r, err
 }
 
 /*
@@ -351,13 +351,13 @@ func (s *Session) CreateMultiTimeseries(paths []string, 
dataTypes []TSDataType,
  *error: correctness of operation
  */
 func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err 
error) {
-    r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, 
paths)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            r, err = s.client.DeleteTimeseries(context.Background(), 
s.sessionId, paths)
-        }
-    }
-    return r, err
+       r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, 
paths)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       r, err = 
s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
+               }
+       }
+       return r, err
 }
 
 /*
@@ -370,15 +370,15 @@ func (s *Session) DeleteTimeseries(paths []string) (r 
*common.TSStatus, err erro
  *error: correctness of operation
  */
 func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) 
(r *common.TSStatus, err error) {
-    request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, 
StartTime: startTime, EndTime: endTime}
-    r, err = s.client.DeleteData(context.Background(), &request)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.DeleteData(context.Background(), &request)
-        }
-    }
-    return r, err
+       request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, 
StartTime: startTime, EndTime: endTime}
+       r, err = s.client.DeleteData(context.Background(), &request)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = s.client.DeleteData(context.Background(), 
&request)
+               }
+       }
+       return r, err
 }
 
 /*
@@ -392,224 +392,224 @@ func (s *Session) DeleteData(paths []string, startTime 
int64, endTime int64) (r
  *error: correctness of operation
  */
 func (s *Session) InsertStringRecord(deviceId string, measurements []string, 
values []string, timestamp int64) (r *common.TSStatus, err error) {
-    request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, PrefixPath: 
deviceId, Measurements: measurements,
-        Values: values, Timestamp: timestamp}
-    r, err = s.client.InsertStringRecord(context.Background(), &request)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertStringRecord(context.Background(), 
&request)
-        }
-    }
-    return r, err
+       request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, 
PrefixPath: deviceId, Measurements: measurements,
+               Values: values, Timestamp: timestamp}
+       r, err = s.client.InsertStringRecord(context.Background(), &request)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = 
s.client.InsertStringRecord(context.Background(), &request)
+               }
+       }
+       return r, err
 }
 
 func (s *Session) GetTimeZone() (string, error) {
-    resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
-    if err != nil {
-        return DefaultTimeZone, err
-    }
-    return resp.TimeZone, nil
+       resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
+       if err != nil {
+               return DefaultTimeZone, err
+       }
+       return resp.TimeZone, nil
 }
 
 func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error) 
{
-    request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone}
-    r, err = s.client.SetTimeZone(context.Background(), &request)
-    s.config.TimeZone = timeZone
-    return r, err
+       request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: 
timeZone}
+       r, err = s.client.SetTimeZone(context.Background(), &request)
+       s.config.TimeZone = timeZone
+       return r, err
 }
 
 func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
-    request := rpc.TSExecuteStatementReq{
-        SessionId:   s.sessionId,
-        Statement:   sql,
-        StatementId: s.requestStatementId,
-        FetchSize:   &s.config.FetchSize,
-    }
-    resp, err := s.client.ExecuteStatement(context.Background(), &request)
-
-    if err != nil && resp == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            request.StatementId = s.requestStatementId
-            resp, err = s.client.ExecuteStatement(context.Background(), 
&request)
-        }
-    }
-
-    return s.genDataSet(sql, resp), err
+       request := rpc.TSExecuteStatementReq{
+               SessionId:   s.sessionId,
+               Statement:   sql,
+               StatementId: s.requestStatementId,
+               FetchSize:   &s.config.FetchSize,
+       }
+       resp, err := s.client.ExecuteStatement(context.Background(), &request)
+
+       if err != nil && resp == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       request.StatementId = s.requestStatementId
+                       resp, err = 
s.client.ExecuteStatement(context.Background(), &request)
+               }
+       }
+
+       return s.genDataSet(sql, resp), err
 }
 
 func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, 
err error) {
-    request := rpc.TSExecuteStatementReq{
-        SessionId:   s.sessionId,
-        Statement:   sql,
-        StatementId: s.requestStatementId,
-        FetchSize:   &s.config.FetchSize,
-    }
-    resp, err := s.client.ExecuteStatement(context.Background(), &request)
-
-    if err != nil && resp == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            request.StatementId = s.requestStatementId
-            resp, err = s.client.ExecuteStatement(context.Background(), 
&request)
-        }
-    }
-
-    return resp.Status, err
+       request := rpc.TSExecuteStatementReq{
+               SessionId:   s.sessionId,
+               Statement:   sql,
+               StatementId: s.requestStatementId,
+               FetchSize:   &s.config.FetchSize,
+       }
+       resp, err := s.client.ExecuteStatement(context.Background(), &request)
+
+       if err != nil && resp == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       request.StatementId = s.requestStatementId
+                       resp, err = 
s.client.ExecuteStatement(context.Background(), &request)
+               }
+       }
+
+       return resp.Status, err
 }
 
 func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) 
(*SessionDataSet, error) {
-    request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: 
sql, StatementId: s.requestStatementId,
-        FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
-    if resp, err := s.client.ExecuteQueryStatement(context.Background(), 
&request); err == nil {
-        if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-            return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
-        } else {
-            return nil, statusErr
-        }
-    } else {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            request.StatementId = s.requestStatementId
-            resp, err = s.client.ExecuteQueryStatement(context.Background(), 
&request)
-            if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-                return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
-            } else {
-                return nil, statusErr
-            }
-        }
-        return nil, err
-    }
+       request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: 
sql, StatementId: s.requestStatementId,
+               FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
+       if resp, err := s.client.ExecuteQueryStatement(context.Background(), 
&request); err == nil {
+               if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+                       return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+               } else {
+                       return nil, statusErr
+               }
+       } else {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       request.StatementId = s.requestStatementId
+                       resp, err = 
s.client.ExecuteQueryStatement(context.Background(), &request)
+                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
+                               return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+                       } else {
+                               return nil, statusErr
+                       }
+               }
+               return nil, err
+       }
 }
 
 func (s *Session) ExecuteAggregationQuery(paths []string, aggregations 
[]common.TAggregationType,
-    startTime *int64, endTime *int64, interval *int64,
-    timeoutMs *int64) (*SessionDataSet, error) {
-
-    request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: 
s.requestStatementId, Paths: paths,
-        Aggregations: aggregations, StartTime: startTime, EndTime: endTime, 
Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
-    if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
-        if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-            return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
-        } else {
-            return nil, statusErr
-        }
-    } else {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            resp, err = s.client.ExecuteAggregationQuery(context.Background(), 
&request)
-            if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-                return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
-            } else {
-                return nil, statusErr
-            }
-        }
-        return nil, err
-    }
+       startTime *int64, endTime *int64, interval *int64,
+       timeoutMs *int64) (*SessionDataSet, error) {
+
+       request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, 
StatementId: s.requestStatementId, Paths: paths,
+               Aggregations: aggregations, StartTime: startTime, EndTime: 
endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
+       if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
+               if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+                       return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+               } else {
+                       return nil, statusErr
+               }
+       } else {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       resp, err = 
s.client.ExecuteAggregationQuery(context.Background(), &request)
+                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
+                               return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+                       } else {
+                               return nil, statusErr
+                       }
+               }
+               return nil, err
+       }
 }
 
 func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, 
aggregations []common.TAggregationType,
-    startTime *int64, endTime *int64, interval *int64,
-    timeoutMs *int64, legalNodes *bool) (*SessionDataSet, error) {
-
-    request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: 
s.requestStatementId, Paths: paths,
-        Aggregations: aggregations, StartTime: startTime, EndTime: endTime, 
Interval: interval, FetchSize: &s.config.FetchSize,
-        Timeout: timeoutMs, LegalPathNodes: legalNodes}
-    if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
-        if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-            return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
-        } else {
-            return nil, statusErr
-        }
-    } else {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            resp, err = s.client.ExecuteAggregationQuery(context.Background(), 
&request)
-            if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-                return NewSessionDataSet("", resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, timeoutMs), err
-            } else {
-                return nil, statusErr
-            }
-        }
-        return nil, err
-    }
+       startTime *int64, endTime *int64, interval *int64,
+       timeoutMs *int64, legalNodes *bool) (*SessionDataSet, error) {
+
+       request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, 
StatementId: s.requestStatementId, Paths: paths,
+               Aggregations: aggregations, StartTime: startTime, EndTime: 
endTime, Interval: interval, FetchSize: &s.config.FetchSize,
+               Timeout: timeoutMs, LegalPathNodes: legalNodes}
+       if resp, err := s.client.ExecuteAggregationQuery(context.Background(), 
&request); err == nil {
+               if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+                       return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+               } else {
+                       return nil, statusErr
+               }
+       } else {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       resp, err = 
s.client.ExecuteAggregationQuery(context.Background(), &request)
+                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
+                               return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+                       } else {
+                               return nil, statusErr
+                       }
+               }
+               return nil, err
+       }
 }
 
 func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
-    measurements []string,
-    types []TSDataType,
-    values []interface{},
-    isAligned bool) (*rpc.TSInsertRecordReq, error) {
-    request := &rpc.TSInsertRecordReq{}
-    request.SessionId = s.sessionId
-    request.PrefixPath = deviceId
-    request.Timestamp = time
-    request.Measurements = measurements
-    request.IsAligned = &isAligned
-    if bys, err := valuesToBytes(types, values); err == nil {
-        request.Values = bys
-    } else {
-        return nil, err
-    }
-    return request, nil
+       measurements []string,
+       types []TSDataType,
+       values []interface{},
+       isAligned bool) (*rpc.TSInsertRecordReq, error) {
+       request := &rpc.TSInsertRecordReq{}
+       request.SessionId = s.sessionId
+       request.PrefixPath = deviceId
+       request.Timestamp = time
+       request.Measurements = measurements
+       request.IsAligned = &isAligned
+       if bys, err := valuesToBytes(types, values); err == nil {
+               request.Values = bys
+       } else {
+               return nil, err
+       }
+       return request, nil
 }
 
 func (s *Session) InsertRecord(deviceId string, measurements []string, 
dataTypes []TSDataType, values []interface{}, timestamp int64) (r 
*common.TSStatus, err error) {
-    request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, 
dataTypes, values, false)
-    if err != nil {
-        return nil, err
-    }
-    r, err = s.client.InsertRecord(context.Background(), request)
-
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertRecord(context.Background(), request)
-        }
-    }
-
-    return r, err
+       request, err := s.genTSInsertRecordReq(deviceId, timestamp, 
measurements, dataTypes, values, false)
+       if err != nil {
+               return nil, err
+       }
+       r, err = s.client.InsertRecord(context.Background(), request)
+
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = s.client.InsertRecord(context.Background(), 
request)
+               }
+       }
+
+       return r, err
 }
 
 func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, 
dataTypes []TSDataType, values []interface{}, timestamp int64) (r 
*common.TSStatus, err error) {
-    request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, 
dataTypes, values, true)
-    if err != nil {
-        return nil, err
-    }
-    r, err = s.client.InsertRecord(context.Background(), request)
-
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertRecord(context.Background(), request)
-        }
-    }
-
-    return r, err
+       request, err := s.genTSInsertRecordReq(deviceId, timestamp, 
measurements, dataTypes, values, true)
+       if err != nil {
+               return nil, err
+       }
+       r, err = s.client.InsertRecord(context.Background(), request)
+
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = s.client.InsertRecord(context.Background(), 
request)
+               }
+       }
+
+       return r, err
 }
 
 type deviceData struct {
-    timestamps        []int64
-    measurementsSlice [][]string
-    dataTypesSlice    [][]TSDataType
-    valuesSlice       [][]interface{}
-    isAligned         bool
+       timestamps        []int64
+       measurementsSlice [][]string
+       dataTypesSlice    [][]TSDataType
+       valuesSlice       [][]interface{}
+       isAligned         bool
 }
 
 func (d *deviceData) Len() int {
-    return len(d.timestamps)
+       return len(d.timestamps)
 }
 
 func (d *deviceData) Less(i, j int) bool {
-    return d.timestamps[i] < d.timestamps[j]
+       return d.timestamps[i] < d.timestamps[j]
 }
 
 func (d *deviceData) Swap(i, j int) {
-    d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i]
-    d.measurementsSlice[i], d.measurementsSlice[j] = d.measurementsSlice[j], 
d.measurementsSlice[i]
-    d.dataTypesSlice[i], d.dataTypesSlice[j] = d.dataTypesSlice[j], 
d.dataTypesSlice[i]
-    d.valuesSlice[i], d.valuesSlice[j] = d.valuesSlice[j], d.valuesSlice[i]
+       d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i]
+       d.measurementsSlice[i], d.measurementsSlice[j] = 
d.measurementsSlice[j], d.measurementsSlice[i]
+       d.dataTypesSlice[i], d.dataTypesSlice[j] = d.dataTypesSlice[j], 
d.dataTypesSlice[i]
+       d.valuesSlice[i], d.valuesSlice[j] = d.valuesSlice[j], d.valuesSlice[i]
 }
 
 // InsertRecordsOfOneDevice Insert multiple rows, which can reduce the 
overhead of network. This method is just like jdbc
@@ -617,88 +617,88 @@ func (d *deviceData) Swap(i, j int) {
 // your performance, please see insertTablet method
 // Each row is independent, which could have different deviceId, time, number 
of measurements
 func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps 
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, 
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
-    length := len(timestamps)
-    if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
-        return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
-    }
-
-    if !sorted {
-        sort.Sort(&deviceData{
-            timestamps:        timestamps,
-            measurementsSlice: measurementsSlice,
-            dataTypesSlice:    dataTypesSlice,
-            valuesSlice:       valuesSlice,
-        })
-    }
-
-    valuesList := make([][]byte, length)
-    for i := 0; i < length; i++ {
-        if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
-            return nil, err
-        }
-    }
-
-    request := &rpc.TSInsertRecordsOfOneDeviceReq{
-        SessionId:        s.sessionId,
-        PrefixPath:       deviceId,
-        Timestamps:       timestamps,
-        MeasurementsList: measurementsSlice,
-        ValuesList:       valuesList,
-    }
-
-    r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
-
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
-        }
-    }
-
-    return r, err
+       length := len(timestamps)
+       if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
+               return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
+       }
+
+       if !sorted {
+               sort.Sort(&deviceData{
+                       timestamps:        timestamps,
+                       measurementsSlice: measurementsSlice,
+                       dataTypesSlice:    dataTypesSlice,
+                       valuesSlice:       valuesSlice,
+               })
+       }
+
+       valuesList := make([][]byte, length)
+       for i := 0; i < length; i++ {
+               if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
+                       return nil, err
+               }
+       }
+
+       request := &rpc.TSInsertRecordsOfOneDeviceReq{
+               SessionId:        s.sessionId,
+               PrefixPath:       deviceId,
+               Timestamps:       timestamps,
+               MeasurementsList: measurementsSlice,
+               ValuesList:       valuesList,
+       }
+
+       r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
+
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = 
s.client.InsertRecordsOfOneDevice(context.Background(), request)
+               }
+       }
+
+       return r, err
 }
 
 func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps 
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, 
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
-    length := len(timestamps)
-    if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
-        return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
-    }
-
-    if !sorted {
-        sort.Sort(&deviceData{
-            timestamps:        timestamps,
-            measurementsSlice: measurementsSlice,
-            dataTypesSlice:    dataTypesSlice,
-            valuesSlice:       valuesSlice,
-        })
-    }
-
-    valuesList := make([][]byte, length)
-    for i := 0; i < length; i++ {
-        if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
-            return nil, err
-        }
-    }
-    var isAligned = true
-    request := &rpc.TSInsertRecordsOfOneDeviceReq{
-        SessionId:        s.sessionId,
-        PrefixPath:       deviceId,
-        Timestamps:       timestamps,
-        MeasurementsList: measurementsSlice,
-        ValuesList:       valuesList,
-        IsAligned:        &isAligned,
-    }
-
-    r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
-
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
-        }
-    }
-
-    return r, err
+       length := len(timestamps)
+       if len(measurementsSlice) != length || len(dataTypesSlice) != length || 
len(valuesSlice) != length {
+               return nil, errors.New("timestamps, measurementsSlice and 
valuesSlice's size should be equal")
+       }
+
+       if !sorted {
+               sort.Sort(&deviceData{
+                       timestamps:        timestamps,
+                       measurementsSlice: measurementsSlice,
+                       dataTypesSlice:    dataTypesSlice,
+                       valuesSlice:       valuesSlice,
+               })
+       }
+
+       valuesList := make([][]byte, length)
+       for i := 0; i < length; i++ {
+               if valuesList[i], err = valuesToBytes(dataTypesSlice[i], 
valuesSlice[i]); err != nil {
+                       return nil, err
+               }
+       }
+       var isAligned = true
+       request := &rpc.TSInsertRecordsOfOneDeviceReq{
+               SessionId:        s.sessionId,
+               PrefixPath:       deviceId,
+               Timestamps:       timestamps,
+               MeasurementsList: measurementsSlice,
+               ValuesList:       valuesList,
+               IsAligned:        &isAligned,
+       }
+
+       r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
+
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = 
s.client.InsertRecordsOfOneDevice(context.Background(), request)
+               }
+       }
+
+       return r, err
 }
 
 /*
@@ -714,37 +714,37 @@ func (s *Session) 
InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
  *
  */
 func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, 
dataTypes [][]TSDataType, values [][]interface{},
-    timestamps []int64) (r *common.TSStatus, err error) {
-    request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, 
values, timestamps, false)
-    if err != nil {
-        return nil, err
-    } else {
-        r, err = s.client.InsertRecords(context.Background(), request)
-        if err != nil && r == nil {
-            if s.reconnect() {
-                request.SessionId = s.sessionId
-                r, err = s.client.InsertRecords(context.Background(), request)
-            }
-        }
-        return r, err
-    }
+       timestamps []int64) (r *common.TSStatus, err error) {
+       request, err := s.genInsertRecordsReq(deviceIds, measurements, 
dataTypes, values, timestamps, false)
+       if err != nil {
+               return nil, err
+       } else {
+               r, err = s.client.InsertRecords(context.Background(), request)
+               if err != nil && r == nil {
+                       if s.reconnect() {
+                               request.SessionId = s.sessionId
+                               r, err = 
s.client.InsertRecords(context.Background(), request)
+                       }
+               }
+               return r, err
+       }
 }
 
 func (s *Session) InsertAlignedRecords(deviceIds []string, measurements 
[][]string, dataTypes [][]TSDataType, values [][]interface{},
-    timestamps []int64) (r *common.TSStatus, err error) {
-    request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, 
values, timestamps, true)
-    if err != nil {
-        return nil, err
-    } else {
-        r, err = s.client.InsertRecords(context.Background(), request)
-        if err != nil && r == nil {
-            if s.reconnect() {
-                request.SessionId = s.sessionId
-                r, err = s.client.InsertRecords(context.Background(), request)
-            }
-        }
-        return r, err
-    }
+       timestamps []int64) (r *common.TSStatus, err error) {
+       request, err := s.genInsertRecordsReq(deviceIds, measurements, 
dataTypes, values, timestamps, true)
+       if err != nil {
+               return nil, err
+       } else {
+               r, err = s.client.InsertRecords(context.Background(), request)
+               if err != nil && r == nil {
+                       if s.reconnect() {
+                               request.SessionId = s.sessionId
+                               r, err = 
s.client.InsertRecords(context.Background(), request)
+                       }
+               }
+               return r, err
+       }
 }
 
 /*
@@ -753,450 +753,450 @@ func (s *Session) InsertAlignedRecords(deviceIds 
[]string, measurements [][]stri
  *tablets: []*client.Tablet, list of tablets
  */
 func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-    if !sorted {
-        for _, t := range tablets {
-            if err := t.Sort(); err != nil {
-                return nil, err
-            }
-        }
-    }
-    request, err := s.genInsertTabletsReq(tablets, false)
-    if err != nil {
-        return nil, err
-    }
-    r, err = s.client.InsertTablets(context.Background(), request)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertTablets(context.Background(), request)
-        }
-    }
-    return r, err
+       if !sorted {
+               for _, t := range tablets {
+                       if err := t.Sort(); err != nil {
+                               return nil, err
+                       }
+               }
+       }
+       request, err := s.genInsertTabletsReq(tablets, false)
+       if err != nil {
+               return nil, err
+       }
+       r, err = s.client.InsertTablets(context.Background(), request)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = s.client.InsertTablets(context.Background(), 
request)
+               }
+       }
+       return r, err
 }
 
 func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-    if !sorted {
-        for _, t := range tablets {
-            if err := t.Sort(); err != nil {
-                return nil, err
-            }
-        }
-    }
-    request, err := s.genInsertTabletsReq(tablets, true)
-    if err != nil {
-        return nil, err
-    }
-    r, err = s.client.InsertTablets(context.Background(), request)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertTablets(context.Background(), request)
-        }
-    }
-    return r, err
+       if !sorted {
+               for _, t := range tablets {
+                       if err := t.Sort(); err != nil {
+                               return nil, err
+                       }
+               }
+       }
+       request, err := s.genInsertTabletsReq(tablets, true)
+       if err != nil {
+               return nil, err
+       }
+       r, err = s.client.InsertTablets(context.Background(), request)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = s.client.InsertTablets(context.Background(), 
request)
+               }
+       }
+       return r, err
 }
 
 func (s *Session) ExecuteBatchStatement(inserts []string) (r *common.TSStatus, 
err error) {
-    request := rpc.TSExecuteBatchStatementReq{
-        SessionId:  s.sessionId,
-        Statements: inserts,
-    }
-    r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.ExecuteBatchStatement(context.Background(), 
&request)
-        }
-    }
-    return r, err
+       request := rpc.TSExecuteBatchStatementReq{
+               SessionId:  s.sessionId,
+               Statements: inserts,
+       }
+       r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = 
s.client.ExecuteBatchStatement(context.Background(), &request)
+               }
+       }
+       return r, err
 }
 
 func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime 
int64) (*SessionDataSet, error) {
-    request := rpc.TSRawDataQueryReq{
-        SessionId:   s.sessionId,
-        Paths:       paths,
-        FetchSize:   &s.config.FetchSize,
-        StartTime:   startTime,
-        EndTime:     endTime,
-        StatementId: s.requestStatementId,
-    }
-    resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request)
-
-    if err != nil && resp == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            request.StatementId = s.requestStatementId
-            resp, err = s.client.ExecuteRawDataQuery(context.Background(), 
&request)
-        }
-    }
-
-    return s.genDataSet("", resp), err
+       request := rpc.TSRawDataQueryReq{
+               SessionId:   s.sessionId,
+               Paths:       paths,
+               FetchSize:   &s.config.FetchSize,
+               StartTime:   startTime,
+               EndTime:     endTime,
+               StatementId: s.requestStatementId,
+       }
+       resp, err := s.client.ExecuteRawDataQuery(context.Background(), 
&request)
+
+       if err != nil && resp == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       request.StatementId = s.requestStatementId
+                       resp, err = 
s.client.ExecuteRawDataQuery(context.Background(), &request)
+               }
+       }
+
+       return s.genDataSet("", resp), err
 }
 
 func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
-    request := rpc.TSExecuteStatementReq{
-        SessionId:   s.sessionId,
-        Statement:   sql,
-        StatementId: s.requestStatementId,
-        FetchSize:   &s.config.FetchSize,
-    }
-    resp, err := s.client.ExecuteUpdateStatement(context.Background(), 
&request)
-
-    if err != nil && resp == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            request.StatementId = s.requestStatementId
-            resp, err = s.client.ExecuteUpdateStatement(context.Background(), 
&request)
-        }
-    }
-
-    return s.genDataSet(sql, resp), err
+       request := rpc.TSExecuteStatementReq{
+               SessionId:   s.sessionId,
+               Statement:   sql,
+               StatementId: s.requestStatementId,
+               FetchSize:   &s.config.FetchSize,
+       }
+       resp, err := s.client.ExecuteUpdateStatement(context.Background(), 
&request)
+
+       if err != nil && resp == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       request.StatementId = s.requestStatementId
+                       resp, err = 
s.client.ExecuteUpdateStatement(context.Background(), &request)
+               }
+       }
+
+       return s.genDataSet(sql, resp), err
 }
 
 func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) 
*SessionDataSet {
-    var queryId int64
-    if resp.QueryId == nil {
-        queryId = 0
-    } else {
-        queryId = *resp.QueryId
-    }
-    return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap,
-        queryId, s.client, s.sessionId, resp.QueryDataSet, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
+       var queryId int64
+       if resp.QueryId == nil {
+               queryId = 0
+       } else {
+               queryId = *resp.QueryId
+       }
+       return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap,
+               queryId, s.client, s.sessionId, resp.QueryDataSet, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
 }
 
 func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) 
(*rpc.TSInsertTabletsReq, error) {
-    var (
-        length           = len(tablets)
-        deviceIds        = make([]string, length)
-        measurementsList = make([][]string, length)
-        valuesList       = make([][]byte, length)
-        timestampsList   = make([][]byte, length)
-        typesList        = make([][]int32, length)
-        sizeList         = make([]int32, length)
-    )
-    for index, tablet := range tablets {
-        deviceIds[index] = tablet.deviceId
-        measurementsList[index] = tablet.GetMeasurements()
-
-        values, err := tablet.getValuesBytes()
-        if err != nil {
-            return nil, err
-        }
-
-        valuesList[index] = values
-        timestampsList[index] = tablet.GetTimestampBytes()
-        typesList[index] = tablet.getDataTypes()
-        sizeList[index] = int32(tablet.RowSize)
-    }
-    request := rpc.TSInsertTabletsReq{
-        SessionId:        s.sessionId,
-        PrefixPaths:      deviceIds,
-        TypesList:        typesList,
-        MeasurementsList: measurementsList,
-        ValuesList:       valuesList,
-        TimestampsList:   timestampsList,
-        SizeList:         sizeList,
-        IsAligned:        &isAligned,
-    }
-    return &request, nil
+       var (
+               length           = len(tablets)
+               deviceIds        = make([]string, length)
+               measurementsList = make([][]string, length)
+               valuesList       = make([][]byte, length)
+               timestampsList   = make([][]byte, length)
+               typesList        = make([][]int32, length)
+               sizeList         = make([]int32, length)
+       )
+       for index, tablet := range tablets {
+               deviceIds[index] = tablet.deviceId
+               measurementsList[index] = tablet.GetMeasurements()
+
+               values, err := tablet.getValuesBytes()
+               if err != nil {
+                       return nil, err
+               }
+
+               valuesList[index] = values
+               timestampsList[index] = tablet.GetTimestampBytes()
+               typesList[index] = tablet.getDataTypes()
+               sizeList[index] = int32(tablet.RowSize)
+       }
+       request := rpc.TSInsertTabletsReq{
+               SessionId:        s.sessionId,
+               PrefixPaths:      deviceIds,
+               TypesList:        typesList,
+               MeasurementsList: measurementsList,
+               ValuesList:       valuesList,
+               TimestampsList:   timestampsList,
+               SizeList:         sizeList,
+               IsAligned:        &isAligned,
+       }
+       return &request, nil
 }
 
 func (s *Session) genInsertRecordsReq(deviceIds []string, measurements 
[][]string, dataTypes [][]TSDataType, values [][]interface{},
-    timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
-    length := len(deviceIds)
-    if length != len(timestamps) || length != len(measurements) || length != 
len(values) {
-        return nil, errLength
-    }
-    request := rpc.TSInsertRecordsReq{
-        SessionId:        s.sessionId,
-        PrefixPaths:      deviceIds,
-        MeasurementsList: measurements,
-        Timestamps:       timestamps,
-        IsAligned:        &isAligned,
-    }
-    v := make([][]byte, length)
-    for i := 0; i < len(measurements); i++ {
-        if bys, err := valuesToBytes(dataTypes[i], values[i]); err == nil {
-            v[i] = bys
-        } else {
-            return nil, err
-        }
-    }
-    request.ValuesList = v
-    return &request, nil
+       timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
+       length := len(deviceIds)
+       if length != len(timestamps) || length != len(measurements) || length 
!= len(values) {
+               return nil, errLength
+       }
+       request := rpc.TSInsertRecordsReq{
+               SessionId:        s.sessionId,
+               PrefixPaths:      deviceIds,
+               MeasurementsList: measurements,
+               Timestamps:       timestamps,
+               IsAligned:        &isAligned,
+       }
+       v := make([][]byte, length)
+       for i := 0; i < len(measurements); i++ {
+               if bys, err := valuesToBytes(dataTypes[i], values[i]); err == 
nil {
+                       v[i] = bys
+               } else {
+                       return nil, err
+               }
+       }
+       request.ValuesList = v
+       return &request, nil
 }
 
 func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, 
error) {
-    buff := &bytes.Buffer{}
-    for i, t := range dataTypes {
-        binary.Write(buff, binary.BigEndian, byte(t))
-        v := values[i]
-        if v == nil {
-            return nil, fmt.Errorf("values[%d] can't be nil", i)
-        }
-
-        switch t {
-        case BOOLEAN:
-            switch v.(type) {
-            case bool:
-                binary.Write(buff, binary.BigEndian, v)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, 
reflect.TypeOf(v))
-            }
-        case INT32:
-            switch v.(type) {
-            case int32:
-                binary.Write(buff, binary.BigEndian, v)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, 
v, reflect.TypeOf(v))
-            }
-        case INT64, TIMESTAMP:
-            switch v.(type) {
-            case int64:
-                binary.Write(buff, binary.BigEndian, v)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, 
v, reflect.TypeOf(v))
-            }
-        case FLOAT:
-            switch v.(type) {
-            case float32:
-                binary.Write(buff, binary.BigEndian, v)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, 
v, reflect.TypeOf(v))
-            }
-        case DOUBLE:
-            switch v.(type) {
-            case float64:
-                binary.Write(buff, binary.BigEndian, v)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, 
v, reflect.TypeOf(v))
-            }
-        case TEXT, STRING:
-            switch s := v.(type) {
-            case string:
-                size := len(s)
-                binary.Write(buff, binary.BigEndian, int32(size))
-                binary.Write(buff, binary.BigEndian, []byte(s))
-            case []byte:
-                size := len(s)
-                binary.Write(buff, binary.BigEndian, int32(size))
-                binary.Write(buff, binary.BigEndian, s)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be string or 
[]byte", i, v, reflect.TypeOf(v))
-            }
-        case BLOB:
-            switch s := v.(type) {
-            case []byte:
-                size := len(s)
-                binary.Write(buff, binary.BigEndian, int32(size))
-                binary.Write(buff, binary.BigEndian, s)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be []byte", i, 
v, reflect.TypeOf(v))
-            }
-        case DATE:
-            switch s := v.(type) {
-            case time.Time:
-                date, err := dateToInt32(s)
-                if err != nil {
-                    return nil, err
-                }
-                binary.Write(buff, binary.BigEndian, date)
-            default:
-                return nil, fmt.Errorf("values[%d] %v(%v) must be time.Time", 
i, v, reflect.TypeOf(v))
-            }
-        default:
-            return nil, fmt.Errorf("types[%d] is incorrect, it must in 
(BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, STRING)", i)
-        }
-    }
-    return buff.Bytes(), nil
+       buff := &bytes.Buffer{}
+       for i, t := range dataTypes {
+               binary.Write(buff, binary.BigEndian, byte(t))
+               v := values[i]
+               if v == nil {
+                       return nil, fmt.Errorf("values[%d] can't be nil", i)
+               }
+
+               switch t {
+               case BOOLEAN:
+                       switch v.(type) {
+                       case bool:
+                               binary.Write(buff, binary.BigEndian, v)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be bool", i, v, reflect.TypeOf(v))
+                       }
+               case INT32:
+                       switch v.(type) {
+                       case int32:
+                               binary.Write(buff, binary.BigEndian, v)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be int32", i, v, reflect.TypeOf(v))
+                       }
+               case INT64, TIMESTAMP:
+                       switch v.(type) {
+                       case int64:
+                               binary.Write(buff, binary.BigEndian, v)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be int64", i, v, reflect.TypeOf(v))
+                       }
+               case FLOAT:
+                       switch v.(type) {
+                       case float32:
+                               binary.Write(buff, binary.BigEndian, v)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be float32", i, v, reflect.TypeOf(v))
+                       }
+               case DOUBLE:
+                       switch v.(type) {
+                       case float64:
+                               binary.Write(buff, binary.BigEndian, v)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be float64", i, v, reflect.TypeOf(v))
+                       }
+               case TEXT, STRING:
+                       switch s := v.(type) {
+                       case string:
+                               size := len(s)
+                               binary.Write(buff, binary.BigEndian, 
int32(size))
+                               binary.Write(buff, binary.BigEndian, []byte(s))
+                       case []byte:
+                               size := len(s)
+                               binary.Write(buff, binary.BigEndian, 
int32(size))
+                               binary.Write(buff, binary.BigEndian, s)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be string or []byte", i, v, reflect.TypeOf(v))
+                       }
+               case BLOB:
+                       switch s := v.(type) {
+                       case []byte:
+                               size := len(s)
+                               binary.Write(buff, binary.BigEndian, 
int32(size))
+                               binary.Write(buff, binary.BigEndian, s)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be []byte", i, v, reflect.TypeOf(v))
+                       }
+               case DATE:
+                       switch s := v.(type) {
+                       case time.Time:
+                               date, err := dateToInt32(s)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               binary.Write(buff, binary.BigEndian, date)
+                       default:
+                               return nil, fmt.Errorf("values[%d] %v(%v) must 
be time.Time", i, v, reflect.TypeOf(v))
+                       }
+               default:
+                       return nil, fmt.Errorf("types[%d] is incorrect, it must 
in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, 
STRING)", i)
+               }
+       }
+       return buff.Bytes(), nil
 }
 
 func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-    if !sorted {
-        if err := tablet.Sort(); err != nil {
-            return nil, err
-        }
-    }
-    request, err := s.genTSInsertTabletReq(tablet, false)
-    if err != nil {
-        return nil, err
-    }
-
-    r, err = s.client.InsertTablet(context.Background(), request)
-
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertTablet(context.Background(), request)
-        }
-    }
-
-    return r, err
+       if !sorted {
+               if err := tablet.Sort(); err != nil {
+                       return nil, err
+               }
+       }
+       request, err := s.genTSInsertTabletReq(tablet, false)
+       if err != nil {
+               return nil, err
+       }
+
+       r, err = s.client.InsertTablet(context.Background(), request)
+
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = s.client.InsertTablet(context.Background(), 
request)
+               }
+       }
+
+       return r, err
 }
 
 func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r 
*common.TSStatus, err error) {
-    if !sorted {
-        if err := tablet.Sort(); err != nil {
-            return nil, err
-        }
-    }
-    request, err := s.genTSInsertTabletReq(tablet, true)
-    if err != nil {
-        return nil, err
-    }
-
-    r, err = s.client.InsertTablet(context.Background(), request)
-
-    if err != nil && r == nil {
-        if s.reconnect() {
-            request.SessionId = s.sessionId
-            r, err = s.client.InsertTablet(context.Background(), request)
-        }
-    }
-
-    return r, err
+       if !sorted {
+               if err := tablet.Sort(); err != nil {
+                       return nil, err
+               }
+       }
+       request, err := s.genTSInsertTabletReq(tablet, true)
+       if err != nil {
+               return nil, err
+       }
+
+       r, err = s.client.InsertTablet(context.Background(), request)
+
+       if err != nil && r == nil {
+               if s.reconnect() {
+                       request.SessionId = s.sessionId
+                       r, err = s.client.InsertTablet(context.Background(), 
request)
+               }
+       }
+
+       return r, err
 }
 
 func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool) 
(*rpc.TSInsertTabletReq, error) {
-    if values, err := tablet.getValuesBytes(); err == nil {
-        request := &rpc.TSInsertTabletReq{
-            SessionId:    s.sessionId,
-            PrefixPath:   tablet.deviceId,
-            Measurements: tablet.GetMeasurements(),
-            Values:       values,
-            Timestamps:   tablet.GetTimestampBytes(),
-            Types:        tablet.getDataTypes(),
-            Size:         int32(tablet.RowSize),
-            IsAligned:    &isAligned,
-        }
-        return request, nil
-    } else {
-        return nil, err
-    }
+       if values, err := tablet.getValuesBytes(); err == nil {
+               request := &rpc.TSInsertTabletReq{
+                       SessionId:    s.sessionId,
+                       PrefixPath:   tablet.deviceId,
+                       Measurements: tablet.GetMeasurements(),
+                       Values:       values,
+                       Timestamps:   tablet.GetTimestampBytes(),
+                       Types:        tablet.getDataTypes(),
+                       Size:         int32(tablet.RowSize),
+                       IsAligned:    &isAligned,
+               }
+               return request, nil
+       } else {
+               return nil, err
+       }
 }
 
 func (s *Session) GetSessionId() int64 {
-    return s.sessionId
+       return s.sessionId
 }
 
 func NewSession(config *Config) Session {
-    endPoint := endPoint{}
-    endPoint.Host = config.Host
-    endPoint.Port = config.Port
-    endPointList.PushBack(endPoint)
-    return Session{config: config}
+       endPoint := endPoint{}
+       endPoint.Host = config.Host
+       endPoint.Port = config.Port
+       endPointList.PushBack(endPoint)
+       return Session{config: config}
 }
 
 func NewClusterSession(clusterConfig *ClusterConfig) Session {
-    session := Session{}
-    node := endPoint{}
-    for i := 0; i < len(clusterConfig.NodeUrls); i++ {
-        node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
-        node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
-        endPointList.PushBack(node)
-    }
-    var err error
-    for e := endPointList.Front(); e != nil; e = e.Next() {
-        session.trans = 
thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port), &thrift.TConfiguration{
-            ConnectTimeout: time.Duration(0), // Use 0 for no timeout
-        })
-        // session.trans = thrift.NewTFramedTransport(session.trans)   // 
deprecated
-        var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
-        session.trans = thrift.NewTFramedTransportConf(session.trans, 
&tmp_conf)
-        if !session.trans.IsOpen() {
-            err = session.trans.Open()
-            if err != nil {
-                log.Println(err)
-            } else {
-                session.config = getConfig(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port,
-                    clusterConfig.UserName, clusterConfig.Password, 
clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
-                break
-            }
-        }
-    }
-    if !session.trans.IsOpen() {
-        log.Fatal("No Server Can Connect")
-    }
-    return session
+       session := Session{}
+       node := endPoint{}
+       for i := 0; i < len(clusterConfig.NodeUrls); i++ {
+               node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
+               node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
+               endPointList.PushBack(node)
+       }
+       var err error
+       for e := endPointList.Front(); e != nil; e = e.Next() {
+               session.trans = 
thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port), &thrift.TConfiguration{
+                       ConnectTimeout: time.Duration(0), // Use 0 for no 
timeout
+               })
+               // session.trans = thrift.NewTFramedTransport(session.trans)    
// deprecated
+               var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
+               session.trans = thrift.NewTFramedTransportConf(session.trans, 
&tmp_conf)
+               if !session.trans.IsOpen() {
+                       err = session.trans.Open()
+                       if err != nil {
+                               log.Println(err)
+                       } else {
+                               session.config = 
getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
+                                       clusterConfig.UserName, 
clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, 
clusterConfig.ConnectRetryMax)
+                               break
+                       }
+               }
+       }
+       if !session.trans.IsOpen() {
+               log.Fatal("No Server Can Connect")
+       }
+       return session
 }
 
 func (s *Session) initClusterConn(node endPoint) error {
-    var err error
-
-    s.trans = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), 
&thrift.TConfiguration{
-        ConnectTimeout: time.Duration(0), // Use 0 for no timeout
-    })
-    if err == nil {
-        // s.trans = thrift.NewTFramedTransport(s.trans)       // deprecated
-        var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
-        s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
-        if !s.trans.IsOpen() {
-            err = s.trans.Open()
-            if err != nil {
-                return err
-            }
-        }
-    }
-
-    if s.config.FetchSize < 1 {
-        s.config.FetchSize = DefaultFetchSize
-    }
-
-    if s.config.TimeZone == "" {
-        s.config.TimeZone = DefaultTimeZone
-    }
-
-    if s.config.ConnectRetryMax < 1 {
-        s.config.ConnectRetryMax = DefaultConnectRetryMax
-    }
-
-    var protocolFactory thrift.TProtocolFactory
-    protocolFactory = getProtocolFactory(s.config.enableRPCCompression)
-    iprot := protocolFactory.GetProtocol(s.trans)
-    oprot := protocolFactory.GetProtocol(s.trans)
-    s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, 
oprot))
-    req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
-        Password: &s.config.Password}
-
-    resp, err := s.client.OpenSession(context.Background(), &req)
-    if err != nil {
-        return err
-    }
-    s.sessionId = resp.GetSessionId()
-    s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
-    return err
+       var err error
+
+       s.trans = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), 
&thrift.TConfiguration{
+               ConnectTimeout: time.Duration(0), // Use 0 for no timeout
+       })
+       if err == nil {
+               // s.trans = thrift.NewTFramedTransport(s.trans)        // 
deprecated
+               var tmp_conf = thrift.TConfiguration{MaxFrameSize: 
thrift.DEFAULT_MAX_FRAME_SIZE}
+               s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
+               if !s.trans.IsOpen() {
+                       err = s.trans.Open()
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       if s.config.FetchSize < 1 {
+               s.config.FetchSize = DefaultFetchSize
+       }
+
+       if s.config.TimeZone == "" {
+               s.config.TimeZone = DefaultTimeZone
+       }
+
+       if s.config.ConnectRetryMax < 1 {
+               s.config.ConnectRetryMax = DefaultConnectRetryMax
+       }
+
+       var protocolFactory thrift.TProtocolFactory
+       protocolFactory = getProtocolFactory(s.config.enableRPCCompression)
+       iprot := protocolFactory.GetProtocol(s.trans)
+       oprot := protocolFactory.GetProtocol(s.trans)
+       s.client = 
rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
+       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, 
Username: s.config.UserName,
+               Password: &s.config.Password}
+
+       resp, err := s.client.OpenSession(context.Background(), &req)
+       if err != nil {
+               return err
+       }
+       s.sessionId = resp.GetSessionId()
+       s.requestStatementId, err = 
s.client.RequestStatementId(context.Background(), s.sessionId)
+       return err
 
 }
 
 func getConfig(host string, port string, userName string, passWord string, 
fetchSize int32, timeZone string, connectRetryMax int) *Config {
-    return &Config{
-        Host:            host,
-        Port:            port,
-        UserName:        userName,
-        Password:        passWord,
-        FetchSize:       fetchSize,
-        TimeZone:        timeZone,
-        ConnectRetryMax: connectRetryMax,
-    }
+       return &Config{
+               Host:            host,
+               Port:            port,
+               UserName:        userName,
+               Password:        passWord,
+               FetchSize:       fetchSize,
+               TimeZone:        timeZone,
+               ConnectRetryMax: connectRetryMax,
+       }
 }
 
 func (s *Session) reconnect() bool {
-    var err error
-    var connectedSuccess = false
-
-    for i := 0; i < s.config.ConnectRetryMax; i++ {
-        for e := endPointList.Front(); e != nil; e = e.Next() {
-            err = s.initClusterConn(e.Value.(endPoint))
-            if err == nil {
-                connectedSuccess = true
-                break
-            } else {
-                log.Println("Connection refused:", e.Value.(endPoint))
-            }
-        }
-        if connectedSuccess {
-            break
-        }
-    }
-    return connectedSuccess
+       var err error
+       var connectedSuccess = false
+
+       for i := 0; i < s.config.ConnectRetryMax; i++ {
+               for e := endPointList.Front(); e != nil; e = e.Next() {
+                       err = s.initClusterConn(e.Value.(endPoint))
+                       if err == nil {
+                               connectedSuccess = true
+                               break
+                       } else {
+                               log.Println("Connection refused:", 
e.Value.(endPoint))
+                       }
+               }
+               if connectedSuccess {
+                       break
+               }
+       }
+       return connectedSuccess
 }

Reply via email to