This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch fix_session_id
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git

commit 1827a622eb50413045c74f685e68a1d367768b4f
Author: HouliangQi <[email protected]>
AuthorDate: Wed Jun 22 11:44:20 2022 +0800

    fix the session id error when reconnect other nodes
---
 client/session.go | 213 ++++++++++++++++++++++++++++++------------------------
 1 file changed, 117 insertions(+), 96 deletions(-)

diff --git a/client/session.go b/client/session.go
index c5b4192..5c4b332 100644
--- a/client/session.go
+++ b/client/session.go
@@ -67,7 +67,7 @@ type endPoint struct {
 }
 
 var endPointList = list.New()
-var session Session
+var defaultSessionConn Session
 
 func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) 
error {
        if s.config.FetchSize <= 0 {
@@ -115,6 +115,8 @@ func (s *Session) Open(enableRPCCompression bool, 
connectionTimeoutInMs int) err
 
        s.SetTimeZone(s.config.TimeZone)
        s.config.TimeZone, err = s.GetTimeZone()
+
+       defaultSessionConn = *s
        return err
 }
 
@@ -168,17 +170,18 @@ func (s *Session) OpenCluster(enableRPCCompression bool) 
error {
 
        s.SetTimeZone(s.config.TimeZone)
        s.config.TimeZone, err = s.GetTimeZone()
+       defaultSessionConn = *s
        return err
 }
 
 func (s *Session) Close() (r *rpc.TSStatus, err error) {
        req := rpc.NewTSCloseSessionReq()
-       req.SessionId = s.sessionId
-       r, err = s.client.CloseSession(context.Background(), req)
+       req.SessionId = defaultSessionConn.sessionId
+       r, err = defaultSessionConn.client.CloseSession(context.Background(), 
req)
        if err != nil {
                return nil, err
        }
-       return nil, s.trans.Close()
+       return nil, defaultSessionConn.trans.Close()
 }
 
 /*
@@ -189,10 +192,10 @@ func (s *Session) Close() (r *rpc.TSStatus, err error) {
  *error: correctness of operation
  */
 func (s *Session) SetStorageGroup(storageGroupId string) (r *rpc.TSStatus, err 
error) {
-       r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, 
storageGroupId)
+       r, err = 
defaultSessionConn.client.SetStorageGroup(context.Background(), 
defaultSessionConn.sessionId, storageGroupId)
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.SetStorageGroup(context.Background(), s.sessionId, 
storageGroupId)
+                       r, err = 
defaultSessionConn.client.SetStorageGroup(context.Background(), 
defaultSessionConn.sessionId, storageGroupId)
                }
        }
        return r, err
@@ -206,10 +209,10 @@ func (s *Session) SetStorageGroup(storageGroupId string) 
(r *rpc.TSStatus, err e
  *error: correctness of operation
  */
 func (s *Session) DeleteStorageGroup(storageGroupId string) (r *rpc.TSStatus, 
err error) {
-       r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, []string{storageGroupId})
+       r, err = 
defaultSessionConn.client.DeleteStorageGroups(context.Background(), 
defaultSessionConn.sessionId, []string{storageGroupId})
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.DeleteStorageGroups(context.Background(), s.sessionId, 
[]string{storageGroupId})
+                       r, err = 
defaultSessionConn.client.DeleteStorageGroups(context.Background(), 
defaultSessionConn.sessionId, []string{storageGroupId})
                }
        }
        return r, err
@@ -223,10 +226,10 @@ func (s *Session) DeleteStorageGroup(storageGroupId 
string) (r *rpc.TSStatus, er
  *error: correctness of operation
  */
 func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r 
*rpc.TSStatus, err error) {
-       r, err = s.client.DeleteStorageGroups(context.Background(), 
s.sessionId, storageGroupIds)
+       r, err = 
defaultSessionConn.client.DeleteStorageGroups(context.Background(), 
defaultSessionConn.sessionId, storageGroupIds)
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.DeleteStorageGroups(context.Background(), s.sessionId, 
storageGroupIds)
+                       r, err = 
defaultSessionConn.client.DeleteStorageGroups(context.Background(), 
defaultSessionConn.sessionId, storageGroupIds)
                }
        }
        return r, err
@@ -243,12 +246,13 @@ func (s *Session) DeleteStorageGroups(storageGroupIds 
...string) (r *rpc.TSStatu
  *error: correctness of operation
  */
 func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding 
TSEncoding, compressor TSCompressionType, attributes map[string]string, tags 
map[string]string) (r *rpc.TSStatus, err error) {
-       request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: 
path, DataType: int32(dataType), Encoding: int32(encoding),
+       request := rpc.TSCreateTimeseriesReq{SessionId: 
defaultSessionConn.sessionId, Path: path, DataType: int32(dataType), Encoding: 
int32(encoding),
                Compressor: int32(compressor), Attributes: attributes, Tags: 
tags}
-       status, err := s.client.CreateTimeseries(context.Background(), &request)
+       status, err := 
defaultSessionConn.client.CreateTimeseries(context.Background(), &request)
        if err != nil && status == nil {
                if reconnect() {
-                       status, err = 
session.client.CreateTimeseries(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       status, err = 
defaultSessionConn.client.CreateTimeseries(context.Background(), &request)
                }
        }
        return status, err
@@ -280,13 +284,14 @@ func (s *Session) CreateMultiTimeseries(paths []string, 
dataTypes []TSDataType,
                destCompressions[i] = int32(e)
        }
 
-       request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, 
Paths: paths, DataTypes: destTypes,
+       request := rpc.TSCreateMultiTimeseriesReq{SessionId: 
defaultSessionConn.sessionId, Paths: paths, DataTypes: destTypes,
                Encodings: destEncodings, Compressors: destCompressions}
-       r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
+       r, err = 
defaultSessionConn.client.CreateMultiTimeseries(context.Background(), &request)
 
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.CreateMultiTimeseries(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       r, err = 
defaultSessionConn.client.CreateMultiTimeseries(context.Background(), &request)
                }
        }
 
@@ -301,10 +306,10 @@ func (s *Session) CreateMultiTimeseries(paths []string, 
dataTypes []TSDataType,
  *error: correctness of operation
  */
 func (s *Session) DeleteTimeseries(paths []string) (r *rpc.TSStatus, err 
error) {
-       r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, 
paths)
+       r, err = 
defaultSessionConn.client.DeleteTimeseries(context.Background(), 
defaultSessionConn.sessionId, paths)
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
+                       r, err = 
defaultSessionConn.client.DeleteTimeseries(context.Background(), 
defaultSessionConn.sessionId, paths)
                }
        }
        return r, err
@@ -320,11 +325,12 @@ func (s *Session) DeleteTimeseries(paths []string) (r 
*rpc.TSStatus, err error)
  *error: correctness of operation
  */
 func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) 
(r *rpc.TSStatus, err error) {
-       request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, 
StartTime: startTime, EndTime: endTime}
-       r, err = s.client.DeleteData(context.Background(), &request)
+       request := rpc.TSDeleteDataReq{SessionId: defaultSessionConn.sessionId, 
Paths: paths, StartTime: startTime, EndTime: endTime}
+       r, err = defaultSessionConn.client.DeleteData(context.Background(), 
&request)
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.DeleteData(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       r, err = 
defaultSessionConn.client.DeleteData(context.Background(), &request)
                }
        }
        return r, err
@@ -341,19 +347,20 @@ func (s *Session) DeleteData(paths []string, startTime 
int64, endTime int64) (r
  *error: correctness of operation
  */
 func (s *Session) InsertStringRecord(deviceId string, measurements []string, 
values []string, timestamp int64) (r *rpc.TSStatus, err error) {
-       request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, 
DeviceId: deviceId, Measurements: measurements,
+       request := rpc.TSInsertStringRecordReq{SessionId: 
defaultSessionConn.sessionId, DeviceId: deviceId, Measurements: measurements,
                Values: values, Timestamp: timestamp}
-       r, err = s.client.InsertStringRecord(context.Background(), &request)
+       r, err = 
defaultSessionConn.client.InsertStringRecord(context.Background(), &request)
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.InsertStringRecord(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       r, err = 
defaultSessionConn.client.InsertStringRecord(context.Background(), &request)
                }
        }
        return r, err
 }
 
 func (s *Session) GetTimeZone() (string, error) {
-       resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
+       resp, err := 
defaultSessionConn.client.GetTimeZone(context.Background(), 
defaultSessionConn.sessionId)
        if err != nil {
                return "", err
        }
@@ -361,44 +368,49 @@ func (s *Session) GetTimeZone() (string, error) {
 }
 
 func (s *Session) SetTimeZone(timeZone string) (r *rpc.TSStatus, err error) {
-       request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: 
timeZone}
-       r, err = s.client.SetTimeZone(context.Background(), &request)
-       s.config.TimeZone = timeZone
+       request := rpc.TSSetTimeZoneReq{SessionId: 
defaultSessionConn.sessionId, TimeZone: timeZone}
+       r, err = defaultSessionConn.client.SetTimeZone(context.Background(), 
&request)
+       defaultSessionConn.config.TimeZone = timeZone
        return r, err
 }
 
 func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
        request := rpc.TSExecuteStatementReq{
-               SessionId:   s.sessionId,
+               SessionId:   defaultSessionConn.sessionId,
                Statement:   sql,
-               StatementId: s.requestStatementId,
-               FetchSize:   &s.config.FetchSize,
+               StatementId: defaultSessionConn.requestStatementId,
+               FetchSize:   &defaultSessionConn.config.FetchSize,
        }
-       resp, err := s.client.ExecuteStatement(context.Background(), &request)
+       resp, err := 
defaultSessionConn.client.ExecuteStatement(context.Background(), &request)
 
        if err != nil && resp == nil {
                if reconnect() {
-                       resp, err = 
session.client.ExecuteStatement(context.Background(), &request)
+                       resp, err = 
defaultSessionConn.client.ExecuteStatement(context.Background(), &request)
                }
        }
 
-       return s.genDataSet(sql, resp), err
+       return defaultSessionConn.genDataSet(sql, resp), err
 }
 
 func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) 
(*SessionDataSet, error) {
-       request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: 
sql, StatementId: s.requestStatementId,
-               FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
-       if resp, err := s.client.ExecuteQueryStatement(context.Background(), 
&request); err == nil {
+       request := rpc.TSExecuteStatementReq{SessionId: 
defaultSessionConn.sessionId, Statement: sql, StatementId: 
defaultSessionConn.requestStatementId,
+               FetchSize: &defaultSessionConn.config.FetchSize, Timeout: 
timeoutMs}
+       if resp, err := 
defaultSessionConn.client.ExecuteQueryStatement(context.Background(), 
&request); err == nil {
                if statusErr := VerifySuccess(resp.Status); statusErr == nil {
-                       return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+                       return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
+                               defaultSessionConn.client, 
defaultSessionConn.sessionId, resp.QueryDataSet,
+                               resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, defaultSessionConn.config.FetchSize, timeoutMs), err
                } else {
                        return nil, statusErr
                }
        } else {
                if reconnect() {
-                       resp, err = 
session.client.ExecuteQueryStatement(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       resp, err = 
defaultSessionConn.client.ExecuteQueryStatement(context.Background(), &request)
                        if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, 
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+                               return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
+                                       defaultSessionConn.client, 
defaultSessionConn.sessionId, resp.QueryDataSet,
+                                       resp.IgnoreTimeStamp != nil && 
*resp.IgnoreTimeStamp, defaultSessionConn.config.FetchSize, timeoutMs), err
                        } else {
                                return nil, statusErr
                        }
@@ -412,7 +424,7 @@ func (s *Session) genTSInsertRecordReq(deviceId string, 
time int64,
        types []TSDataType,
        values []interface{}) (*rpc.TSInsertRecordReq, error) {
        request := &rpc.TSInsertRecordReq{}
-       request.SessionId = s.sessionId
+       request.SessionId = defaultSessionConn.sessionId
        request.DeviceId = deviceId
        request.Timestamp = time
        request.Measurements = measurements
@@ -426,15 +438,16 @@ func (s *Session) genTSInsertRecordReq(deviceId string, 
time int64,
 }
 
 func (s *Session) InsertRecord(deviceId string, measurements []string, 
dataTypes []TSDataType, values []interface{}, timestamp int64) (r 
*rpc.TSStatus, err error) {
-       request, err := s.genTSInsertRecordReq(deviceId, timestamp, 
measurements, dataTypes, values)
+       request, err := defaultSessionConn.genTSInsertRecordReq(deviceId, 
timestamp, measurements, dataTypes, values)
        if err != nil {
                return nil, err
        }
-       r, err = s.client.InsertRecord(context.Background(), request)
+       r, err = defaultSessionConn.client.InsertRecord(context.Background(), 
request)
 
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.InsertRecord(context.Background(), request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       r, err = 
defaultSessionConn.client.InsertRecord(context.Background(), request)
                }
        }
 
@@ -490,18 +503,19 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId 
string, timestamps []int64,
        }
 
        request := &rpc.TSInsertRecordsOfOneDeviceReq{
-               SessionId:        s.sessionId,
+               SessionId:        defaultSessionConn.sessionId,
                DeviceId:         deviceId,
                Timestamps:       timestamps,
                MeasurementsList: measurementsSlice,
                ValuesList:       valuesList,
        }
 
-       r, err = s.client.InsertRecordsOfOneDevice(context.Background(), 
request)
+       r, err = 
defaultSessionConn.client.InsertRecordsOfOneDevice(context.Background(), 
request)
 
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.InsertRecordsOfOneDevice(context.Background(), request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       r, err = 
defaultSessionConn.client.InsertRecordsOfOneDevice(context.Background(), 
request)
                }
        }
 
@@ -522,14 +536,15 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId 
string, timestamps []int64,
  */
 func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, 
dataTypes [][]TSDataType, values [][]interface{},
        timestamps []int64) (r *rpc.TSStatus, err error) {
-       request, err := s.genInsertRecordsReq(deviceIds, measurements, 
dataTypes, values, timestamps)
+       request, err := defaultSessionConn.genInsertRecordsReq(deviceIds, 
measurements, dataTypes, values, timestamps)
        if err != nil {
                return nil, err
        } else {
-               r, err = s.client.InsertRecords(context.Background(), request)
+               r, err = 
defaultSessionConn.client.InsertRecords(context.Background(), request)
                if err != nil && r == nil {
                        if reconnect() {
-                               r, err = 
session.client.InsertRecords(context.Background(), request)
+                               request.SessionId = defaultSessionConn.sessionId
+                               r, err = 
defaultSessionConn.client.InsertRecords(context.Background(), request)
                        }
                }
                return r, err
@@ -549,14 +564,15 @@ func (s *Session) InsertTablets(tablets []*Tablet, sorted 
bool) (r *rpc.TSStatus
                        }
                }
        }
-       request, err := s.genInsertTabletsReq(tablets)
+       request, err := defaultSessionConn.genInsertTabletsReq(tablets)
        if err != nil {
                return nil, err
        }
-       r, err = s.client.InsertTablets(context.Background(), request)
+       r, err = defaultSessionConn.client.InsertTablets(context.Background(), 
request)
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.InsertTablets(context.Background(), request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       r, err = 
defaultSessionConn.client.InsertTablets(context.Background(), request)
                }
        }
        return r, err
@@ -564,13 +580,14 @@ func (s *Session) InsertTablets(tablets []*Tablet, sorted 
bool) (r *rpc.TSStatus
 
 func (s *Session) ExecuteBatchStatement(inserts []string) (r *rpc.TSStatus, 
err error) {
        request := rpc.TSExecuteBatchStatementReq{
-               SessionId:  s.sessionId,
+               SessionId:  defaultSessionConn.sessionId,
                Statements: inserts,
        }
-       r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
+       r, err = 
defaultSessionConn.client.ExecuteBatchStatement(context.Background(), &request)
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.ExecuteBatchStatement(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       r, err = 
defaultSessionConn.client.ExecuteBatchStatement(context.Background(), &request)
                }
        }
        return r, err
@@ -578,44 +595,48 @@ func (s *Session) ExecuteBatchStatement(inserts []string) 
(r *rpc.TSStatus, err
 
 func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime 
int64) (*SessionDataSet, error) {
        request := rpc.TSRawDataQueryReq{
-               SessionId:   s.sessionId,
+               SessionId:   defaultSessionConn.sessionId,
                Paths:       paths,
-               FetchSize:   &s.config.FetchSize,
+               FetchSize:   &defaultSessionConn.config.FetchSize,
                StartTime:   startTime,
                EndTime:     endTime,
-               StatementId: s.requestStatementId,
+               StatementId: defaultSessionConn.requestStatementId,
        }
-       resp, err := s.client.ExecuteRawDataQuery(context.Background(), 
&request)
+       resp, err := 
defaultSessionConn.client.ExecuteRawDataQuery(context.Background(), &request)
 
        if err != nil && resp == nil {
                if reconnect() {
-                       resp, err = 
session.client.ExecuteRawDataQuery(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       resp, err = 
defaultSessionConn.client.ExecuteRawDataQuery(context.Background(), &request)
                }
        }
 
-       return s.genDataSet("", resp), err
+       return defaultSessionConn.genDataSet("", resp), err
 }
 
 func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
        request := rpc.TSExecuteStatementReq{
-               SessionId:   s.sessionId,
+               SessionId:   defaultSessionConn.sessionId,
                Statement:   sql,
-               StatementId: s.requestStatementId,
-               FetchSize:   &s.config.FetchSize,
+               StatementId: defaultSessionConn.requestStatementId,
+               FetchSize:   &defaultSessionConn.config.FetchSize,
        }
-       resp, err := s.client.ExecuteUpdateStatement(context.Background(), 
&request)
+       resp, err := 
defaultSessionConn.client.ExecuteUpdateStatement(context.Background(), &request)
 
        if err != nil && resp == nil {
                if reconnect() {
-                       resp, err = 
session.client.ExecuteUpdateStatement(context.Background(), &request)
+                       request.SessionId = defaultSessionConn.sessionId
+                       resp, err = 
defaultSessionConn.client.ExecuteUpdateStatement(context.Background(), &request)
                }
        }
 
-       return s.genDataSet(sql, resp), err
+       return defaultSessionConn.genDataSet(sql, resp), err
 }
 
 func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) 
*SessionDataSet {
-       return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, 
resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
s.config.FetchSize, nil)
+       return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, 
resp.ColumnNameIndexMap, *resp.QueryId,
+               defaultSessionConn.client, defaultSessionConn.sessionId, 
resp.QueryDataSet,
+               resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, 
defaultSessionConn.config.FetchSize, nil)
 }
 
 func (s *Session) genInsertTabletsReq(tablets []*Tablet) 
(*rpc.TSInsertTabletsReq, error) {
@@ -643,7 +664,7 @@ func (s *Session) genInsertTabletsReq(tablets []*Tablet) 
(*rpc.TSInsertTabletsRe
                sizeList[index] = int32(tablet.rowCount)
        }
        request := rpc.TSInsertTabletsReq{
-               SessionId:        s.sessionId,
+               SessionId:        defaultSessionConn.sessionId,
                DeviceIds:        deviceIds,
                TypesList:        typesList,
                MeasurementsList: measurementsList,
@@ -661,7 +682,7 @@ func (s *Session) genInsertRecordsReq(deviceIds []string, 
measurements [][]strin
                return nil, errLength
        }
        request := rpc.TSInsertRecordsReq{
-               SessionId:        s.sessionId,
+               SessionId:        defaultSessionConn.sessionId,
                DeviceIds:        deviceIds,
                MeasurementsList: measurements,
                Timestamps:       timestamps,
@@ -745,16 +766,16 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted 
bool) (r *rpc.TSStatus, er
                        return nil, err
                }
        }
-       request, err := s.genTSInsertTabletReq(tablet)
+       request, err := defaultSessionConn.genTSInsertTabletReq(tablet)
        if err != nil {
                return nil, err
        }
 
-       r, err = s.client.InsertTablet(context.Background(), request)
+       r, err = defaultSessionConn.client.InsertTablet(context.Background(), 
request)
 
        if err != nil && r == nil {
                if reconnect() {
-                       r, err = 
session.client.InsertTablet(context.Background(), request)
+                       r, err = 
defaultSessionConn.client.InsertTablet(context.Background(), request)
                }
        }
 
@@ -764,7 +785,7 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted bool) 
(r *rpc.TSStatus, er
 func (s *Session) genTSInsertTabletReq(tablet *Tablet) 
(*rpc.TSInsertTabletReq, error) {
        if values, err := tablet.getValuesBytes(); err == nil {
                request := &rpc.TSInsertTabletReq{
-                       SessionId:    s.sessionId,
+                       SessionId:    defaultSessionConn.sessionId,
                        DeviceId:     tablet.deviceId,
                        Measurements: tablet.GetMeasurements(),
                        Values:       values,
@@ -779,7 +800,7 @@ func (s *Session) genTSInsertTabletReq(tablet *Tablet) 
(*rpc.TSInsertTabletReq,
 }
 
 func (s *Session) GetSessionId() int64 {
-       return s.sessionId
+       return defaultSessionConn.sessionId
 }
 
 func NewSession(config *Config) Session {
@@ -800,17 +821,17 @@ func NewClusterSession(ClusterConfig *ClusterConfig) 
Session {
        }
        var err error
        for e := endPointList.Front(); e != nil; e = e.Next() {
-               session.trans, err = 
thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port), &thrift.TConfiguration{
+               defaultSessionConn.trans, err = 
thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, 
e.Value.(endPoint).Port), &thrift.TConfiguration{
                        ConnectTimeout: time.Duration(0), // Use 0 for no 
timeout
                })
                if err == nil {
-                       session.trans = 
thrift.NewTFramedTransport(session.trans)
-                       if !session.trans.IsOpen() {
-                               err = session.trans.Open()
+                       defaultSessionConn.trans = 
thrift.NewTFramedTransport(defaultSessionConn.trans)
+                       if !defaultSessionConn.trans.IsOpen() {
+                               err = defaultSessionConn.trans.Open()
                                if err != nil {
                                        log.Println(err)
                                } else {
-                                       session.config = 
getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
+                                       defaultSessionConn.config = 
getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
                                                ClusterConfig.UserName, 
ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
                                        break
                                }
@@ -820,19 +841,19 @@ func NewClusterSession(ClusterConfig *ClusterConfig) 
Session {
        if err != nil {
                log.Fatal("No Server Can Connect")
        }
-       return session
+       return defaultSessionConn
 }
 
 func initClusterConn(node endPoint) error {
        var err error
 
-       session.trans, err = thrift.NewTSocketConf(net.JoinHostPort(node.Host, 
node.Port), &thrift.TConfiguration{
+       defaultSessionConn.trans, err = 
thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), 
&thrift.TConfiguration{
                ConnectTimeout: time.Duration(0), // Use 0 for no timeout
        })
        if err == nil {
-               session.trans = thrift.NewTFramedTransport(session.trans)
-               if !session.trans.IsOpen() {
-                       err = session.trans.Open()
+               defaultSessionConn.trans = 
thrift.NewTFramedTransport(defaultSessionConn.trans)
+               if !defaultSessionConn.trans.IsOpen() {
+                       err = defaultSessionConn.trans.Open()
                        if err != nil {
                                return err
                        }
@@ -840,24 +861,24 @@ func initClusterConn(node endPoint) error {
        }
        var protocolFactory thrift.TProtocolFactory
        protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
-       iprot := protocolFactory.GetProtocol(session.trans)
-       oprot := protocolFactory.GetProtocol(session.trans)
-       session.client = 
rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot))
-       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: 
session.config.TimeZone, Username: &session.config.UserName,
-               Password: &session.config.Password}
+       iprot := protocolFactory.GetProtocol(defaultSessionConn.trans)
+       oprot := protocolFactory.GetProtocol(defaultSessionConn.trans)
+       defaultSessionConn.client = 
rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot))
+       req := rpc.TSOpenSessionReq{ClientProtocol: 
rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: 
defaultSessionConn.config.TimeZone, Username: 
&defaultSessionConn.config.UserName,
+               Password: &defaultSessionConn.config.Password}
        fmt.Println(req)
-       resp, err := session.client.OpenSession(context.Background(), &req)
+       resp, err := 
defaultSessionConn.client.OpenSession(context.Background(), &req)
        if err != nil {
                return err
        }
-       session.sessionId = resp.GetSessionId()
-       session.requestStatementId, err = 
session.client.RequestStatementId(context.Background(), session.sessionId)
+       defaultSessionConn.sessionId = resp.GetSessionId()
+       defaultSessionConn.requestStatementId, err = 
defaultSessionConn.client.RequestStatementId(context.Background(), 
defaultSessionConn.sessionId)
        if err != nil {
                return err
        }
 
-       session.SetTimeZone(session.config.TimeZone)
-       session.config.TimeZone, err = session.GetTimeZone()
+       defaultSessionConn.SetTimeZone(defaultSessionConn.config.TimeZone)
+       defaultSessionConn.config.TimeZone, err = 
defaultSessionConn.GetTimeZone()
        return err
 
 }

Reply via email to