This is an automated email from the ASF dual-hosted git repository. neuyilan pushed a commit to branch fix_session_id in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
commit 1827a622eb50413045c74f685e68a1d367768b4f Author: HouliangQi <[email protected]> AuthorDate: Wed Jun 22 11:44:20 2022 +0800 fix the session id error when reconnect other nodes --- client/session.go | 213 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 117 insertions(+), 96 deletions(-) diff --git a/client/session.go b/client/session.go index c5b4192..5c4b332 100644 --- a/client/session.go +++ b/client/session.go @@ -67,7 +67,7 @@ type endPoint struct { } var endPointList = list.New() -var session Session +var defaultSessionConn Session func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) error { if s.config.FetchSize <= 0 { @@ -115,6 +115,8 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err s.SetTimeZone(s.config.TimeZone) s.config.TimeZone, err = s.GetTimeZone() + + defaultSessionConn = *s return err } @@ -168,17 +170,18 @@ func (s *Session) OpenCluster(enableRPCCompression bool) error { s.SetTimeZone(s.config.TimeZone) s.config.TimeZone, err = s.GetTimeZone() + defaultSessionConn = *s return err } func (s *Session) Close() (r *rpc.TSStatus, err error) { req := rpc.NewTSCloseSessionReq() - req.SessionId = s.sessionId - r, err = s.client.CloseSession(context.Background(), req) + req.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.CloseSession(context.Background(), req) if err != nil { return nil, err } - return nil, s.trans.Close() + return nil, defaultSessionConn.trans.Close() } /* @@ -189,10 +192,10 @@ func (s *Session) Close() (r *rpc.TSStatus, err error) { *error: correctness of operation */ func (s *Session) SetStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) { - r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) + r, err = defaultSessionConn.client.SetStorageGroup(context.Background(), defaultSessionConn.sessionId, storageGroupId) if err != nil && r == nil { if reconnect() { - r, err = session.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) + r, err = defaultSessionConn.client.SetStorageGroup(context.Background(), defaultSessionConn.sessionId, storageGroupId) } } return r, err @@ -206,10 +209,10 @@ func (s *Session) SetStorageGroup(storageGroupId string) (r *rpc.TSStatus, err e *error: correctness of operation */ func (s *Session) DeleteStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) + r, err = defaultSessionConn.client.DeleteStorageGroups(context.Background(), defaultSessionConn.sessionId, []string{storageGroupId}) if err != nil && r == nil { if reconnect() { - r, err = session.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) + r, err = defaultSessionConn.client.DeleteStorageGroups(context.Background(), defaultSessionConn.sessionId, []string{storageGroupId}) } } return r, err @@ -223,10 +226,10 @@ func (s *Session) DeleteStorageGroup(storageGroupId string) (r *rpc.TSStatus, er *error: correctness of operation */ func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *rpc.TSStatus, err error) { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) + r, err = defaultSessionConn.client.DeleteStorageGroups(context.Background(), defaultSessionConn.sessionId, storageGroupIds) if err != nil && r == nil { if reconnect() { - r, err = session.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) + r, err = defaultSessionConn.client.DeleteStorageGroups(context.Background(), defaultSessionConn.sessionId, storageGroupIds) } } return r, err @@ -243,12 +246,13 @@ func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *rpc.TSStatu *error: correctness of operation */ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r *rpc.TSStatus, err error) { - request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding), + request := rpc.TSCreateTimeseriesReq{SessionId: defaultSessionConn.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding), Compressor: int32(compressor), Attributes: attributes, Tags: tags} - status, err := s.client.CreateTimeseries(context.Background(), &request) + status, err := defaultSessionConn.client.CreateTimeseries(context.Background(), &request) if err != nil && status == nil { if reconnect() { - status, err = session.client.CreateTimeseries(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + status, err = defaultSessionConn.client.CreateTimeseries(context.Background(), &request) } } return status, err @@ -280,13 +284,14 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, destCompressions[i] = int32(e) } - request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, Paths: paths, DataTypes: destTypes, + request := rpc.TSCreateMultiTimeseriesReq{SessionId: defaultSessionConn.sessionId, Paths: paths, DataTypes: destTypes, Encodings: destEncodings, Compressors: destCompressions} - r, err = s.client.CreateMultiTimeseries(context.Background(), &request) + r, err = defaultSessionConn.client.CreateMultiTimeseries(context.Background(), &request) if err != nil && r == nil { if reconnect() { - r, err = session.client.CreateMultiTimeseries(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.CreateMultiTimeseries(context.Background(), &request) } } @@ -301,10 +306,10 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, *error: correctness of operation */ func (s *Session) DeleteTimeseries(paths []string) (r *rpc.TSStatus, err error) { - r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) + r, err = defaultSessionConn.client.DeleteTimeseries(context.Background(), defaultSessionConn.sessionId, paths) if err != nil && r == nil { if reconnect() { - r, err = session.client.DeleteTimeseries(context.Background(), s.sessionId, paths) + r, err = defaultSessionConn.client.DeleteTimeseries(context.Background(), defaultSessionConn.sessionId, paths) } } return r, err @@ -320,11 +325,12 @@ func (s *Session) DeleteTimeseries(paths []string) (r *rpc.TSStatus, err error) *error: correctness of operation */ func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *rpc.TSStatus, err error) { - request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime} - r, err = s.client.DeleteData(context.Background(), &request) + request := rpc.TSDeleteDataReq{SessionId: defaultSessionConn.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime} + r, err = defaultSessionConn.client.DeleteData(context.Background(), &request) if err != nil && r == nil { if reconnect() { - r, err = session.client.DeleteData(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.DeleteData(context.Background(), &request) } } return r, err @@ -341,19 +347,20 @@ func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *error: correctness of operation */ func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r *rpc.TSStatus, err error) { - request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, DeviceId: deviceId, Measurements: measurements, + request := rpc.TSInsertStringRecordReq{SessionId: defaultSessionConn.sessionId, DeviceId: deviceId, Measurements: measurements, Values: values, Timestamp: timestamp} - r, err = s.client.InsertStringRecord(context.Background(), &request) + r, err = defaultSessionConn.client.InsertStringRecord(context.Background(), &request) if err != nil && r == nil { if reconnect() { - r, err = session.client.InsertStringRecord(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.InsertStringRecord(context.Background(), &request) } } return r, err } func (s *Session) GetTimeZone() (string, error) { - resp, err := s.client.GetTimeZone(context.Background(), s.sessionId) + resp, err := defaultSessionConn.client.GetTimeZone(context.Background(), defaultSessionConn.sessionId) if err != nil { return "", err } @@ -361,44 +368,49 @@ func (s *Session) GetTimeZone() (string, error) { } func (s *Session) SetTimeZone(timeZone string) (r *rpc.TSStatus, err error) { - request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone} - r, err = s.client.SetTimeZone(context.Background(), &request) - s.config.TimeZone = timeZone + request := rpc.TSSetTimeZoneReq{SessionId: defaultSessionConn.sessionId, TimeZone: timeZone} + r, err = defaultSessionConn.client.SetTimeZone(context.Background(), &request) + defaultSessionConn.config.TimeZone = timeZone return r, err } func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) { request := rpc.TSExecuteStatementReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, Statement: sql, - StatementId: s.requestStatementId, - FetchSize: &s.config.FetchSize, + StatementId: defaultSessionConn.requestStatementId, + FetchSize: &defaultSessionConn.config.FetchSize, } - resp, err := s.client.ExecuteStatement(context.Background(), &request) + resp, err := defaultSessionConn.client.ExecuteStatement(context.Background(), &request) if err != nil && resp == nil { if reconnect() { - resp, err = session.client.ExecuteStatement(context.Background(), &request) + resp, err = defaultSessionConn.client.ExecuteStatement(context.Background(), &request) } } - return s.genDataSet(sql, resp), err + return defaultSessionConn.genDataSet(sql, resp), err } func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionDataSet, error) { - request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId, - FetchSize: &s.config.FetchSize, Timeout: timeoutMs} - if resp, err := s.client.ExecuteQueryStatement(context.Background(), &request); err == nil { + request := rpc.TSExecuteStatementReq{SessionId: defaultSessionConn.sessionId, Statement: sql, StatementId: defaultSessionConn.requestStatementId, + FetchSize: &defaultSessionConn.config.FetchSize, Timeout: timeoutMs} + if resp, err := defaultSessionConn.client.ExecuteQueryStatement(context.Background(), &request); err == nil { if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, + defaultSessionConn.client, defaultSessionConn.sessionId, resp.QueryDataSet, + resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, defaultSessionConn.config.FetchSize, timeoutMs), err } else { return nil, statusErr } } else { if reconnect() { - resp, err = session.client.ExecuteQueryStatement(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + resp, err = defaultSessionConn.client.ExecuteQueryStatement(context.Background(), &request) if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, + defaultSessionConn.client, defaultSessionConn.sessionId, resp.QueryDataSet, + resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, defaultSessionConn.config.FetchSize, timeoutMs), err } else { return nil, statusErr } @@ -412,7 +424,7 @@ func (s *Session) genTSInsertRecordReq(deviceId string, time int64, types []TSDataType, values []interface{}) (*rpc.TSInsertRecordReq, error) { request := &rpc.TSInsertRecordReq{} - request.SessionId = s.sessionId + request.SessionId = defaultSessionConn.sessionId request.DeviceId = deviceId request.Timestamp = time request.Measurements = measurements @@ -426,15 +438,16 @@ func (s *Session) genTSInsertRecordReq(deviceId string, time int64, } func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) { - request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values) + request, err := defaultSessionConn.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values) if err != nil { return nil, err } - r, err = s.client.InsertRecord(context.Background(), request) + r, err = defaultSessionConn.client.InsertRecord(context.Background(), request) if err != nil && r == nil { if reconnect() { - r, err = session.client.InsertRecord(context.Background(), request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.InsertRecord(context.Background(), request) } } @@ -490,18 +503,19 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, } request := &rpc.TSInsertRecordsOfOneDeviceReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, DeviceId: deviceId, Timestamps: timestamps, MeasurementsList: measurementsSlice, ValuesList: valuesList, } - r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) + r, err = defaultSessionConn.client.InsertRecordsOfOneDevice(context.Background(), request) if err != nil && r == nil { if reconnect() { - r, err = session.client.InsertRecordsOfOneDevice(context.Background(), request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.InsertRecordsOfOneDevice(context.Background(), request) } } @@ -522,14 +536,15 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, */ func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r *rpc.TSStatus, err error) { - request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps) + request, err := defaultSessionConn.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps) if err != nil { return nil, err } else { - r, err = s.client.InsertRecords(context.Background(), request) + r, err = defaultSessionConn.client.InsertRecords(context.Background(), request) if err != nil && r == nil { if reconnect() { - r, err = session.client.InsertRecords(context.Background(), request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.InsertRecords(context.Background(), request) } } return r, err @@ -549,14 +564,15 @@ func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r *rpc.TSStatus } } } - request, err := s.genInsertTabletsReq(tablets) + request, err := defaultSessionConn.genInsertTabletsReq(tablets) if err != nil { return nil, err } - r, err = s.client.InsertTablets(context.Background(), request) + r, err = defaultSessionConn.client.InsertTablets(context.Background(), request) if err != nil && r == nil { if reconnect() { - r, err = session.client.InsertTablets(context.Background(), request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.InsertTablets(context.Background(), request) } } return r, err @@ -564,13 +580,14 @@ func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r *rpc.TSStatus func (s *Session) ExecuteBatchStatement(inserts []string) (r *rpc.TSStatus, err error) { request := rpc.TSExecuteBatchStatementReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, Statements: inserts, } - r, err = s.client.ExecuteBatchStatement(context.Background(), &request) + r, err = defaultSessionConn.client.ExecuteBatchStatement(context.Background(), &request) if err != nil && r == nil { if reconnect() { - r, err = session.client.ExecuteBatchStatement(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + r, err = defaultSessionConn.client.ExecuteBatchStatement(context.Background(), &request) } } return r, err @@ -578,44 +595,48 @@ func (s *Session) ExecuteBatchStatement(inserts []string) (r *rpc.TSStatus, err func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) { request := rpc.TSRawDataQueryReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, Paths: paths, - FetchSize: &s.config.FetchSize, + FetchSize: &defaultSessionConn.config.FetchSize, StartTime: startTime, EndTime: endTime, - StatementId: s.requestStatementId, + StatementId: defaultSessionConn.requestStatementId, } - resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request) + resp, err := defaultSessionConn.client.ExecuteRawDataQuery(context.Background(), &request) if err != nil && resp == nil { if reconnect() { - resp, err = session.client.ExecuteRawDataQuery(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + resp, err = defaultSessionConn.client.ExecuteRawDataQuery(context.Background(), &request) } } - return s.genDataSet("", resp), err + return defaultSessionConn.genDataSet("", resp), err } func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) { request := rpc.TSExecuteStatementReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, Statement: sql, - StatementId: s.requestStatementId, - FetchSize: &s.config.FetchSize, + StatementId: defaultSessionConn.requestStatementId, + FetchSize: &defaultSessionConn.config.FetchSize, } - resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request) + resp, err := defaultSessionConn.client.ExecuteUpdateStatement(context.Background(), &request) if err != nil && resp == nil { if reconnect() { - resp, err = session.client.ExecuteUpdateStatement(context.Background(), &request) + request.SessionId = defaultSessionConn.sessionId + resp, err = defaultSessionConn.client.ExecuteUpdateStatement(context.Background(), &request) } } - return s.genDataSet(sql, resp), err + return defaultSessionConn.genDataSet(sql, resp), err } func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) *SessionDataSet { - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil) + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, + defaultSessionConn.client, defaultSessionConn.sessionId, resp.QueryDataSet, + resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, defaultSessionConn.config.FetchSize, nil) } func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsReq, error) { @@ -643,7 +664,7 @@ func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsRe sizeList[index] = int32(tablet.rowCount) } request := rpc.TSInsertTabletsReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, DeviceIds: deviceIds, TypesList: typesList, MeasurementsList: measurementsList, @@ -661,7 +682,7 @@ func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]strin return nil, errLength } request := rpc.TSInsertRecordsReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, DeviceIds: deviceIds, MeasurementsList: measurements, Timestamps: timestamps, @@ -745,16 +766,16 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, er return nil, err } } - request, err := s.genTSInsertTabletReq(tablet) + request, err := defaultSessionConn.genTSInsertTabletReq(tablet) if err != nil { return nil, err } - r, err = s.client.InsertTablet(context.Background(), request) + r, err = defaultSessionConn.client.InsertTablet(context.Background(), request) if err != nil && r == nil { if reconnect() { - r, err = session.client.InsertTablet(context.Background(), request) + r, err = defaultSessionConn.client.InsertTablet(context.Background(), request) } } @@ -764,7 +785,7 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, er func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq, error) { if values, err := tablet.getValuesBytes(); err == nil { request := &rpc.TSInsertTabletReq{ - SessionId: s.sessionId, + SessionId: defaultSessionConn.sessionId, DeviceId: tablet.deviceId, Measurements: tablet.GetMeasurements(), Values: values, @@ -779,7 +800,7 @@ func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq, } func (s *Session) GetSessionId() int64 { - return s.sessionId + return defaultSessionConn.sessionId } func NewSession(config *Config) Session { @@ -800,17 +821,17 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session { } var err error for e := endPointList.Front(); e != nil; e = e.Next() { - session.trans, err = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{ + defaultSessionConn.trans, err = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{ ConnectTimeout: time.Duration(0), // Use 0 for no timeout }) if err == nil { - session.trans = thrift.NewTFramedTransport(session.trans) - if !session.trans.IsOpen() { - err = session.trans.Open() + defaultSessionConn.trans = thrift.NewTFramedTransport(defaultSessionConn.trans) + if !defaultSessionConn.trans.IsOpen() { + err = defaultSessionConn.trans.Open() if err != nil { log.Println(err) } else { - session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port, + defaultSessionConn.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port, ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone) break } @@ -820,19 +841,19 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session { if err != nil { log.Fatal("No Server Can Connect") } - return session + return defaultSessionConn } func initClusterConn(node endPoint) error { var err error - session.trans, err = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), &thrift.TConfiguration{ + defaultSessionConn.trans, err = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), &thrift.TConfiguration{ ConnectTimeout: time.Duration(0), // Use 0 for no timeout }) if err == nil { - session.trans = thrift.NewTFramedTransport(session.trans) - if !session.trans.IsOpen() { - err = session.trans.Open() + defaultSessionConn.trans = thrift.NewTFramedTransport(defaultSessionConn.trans) + if !defaultSessionConn.trans.IsOpen() { + err = defaultSessionConn.trans.Open() if err != nil { return err } @@ -840,24 +861,24 @@ func initClusterConn(node endPoint) error { } var protocolFactory thrift.TProtocolFactory protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() - iprot := protocolFactory.GetProtocol(session.trans) - oprot := protocolFactory.GetProtocol(session.trans) - session.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot)) - req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: session.config.TimeZone, Username: &session.config.UserName, - Password: &session.config.Password} + iprot := protocolFactory.GetProtocol(defaultSessionConn.trans) + oprot := protocolFactory.GetProtocol(defaultSessionConn.trans) + defaultSessionConn.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot)) + req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: defaultSessionConn.config.TimeZone, Username: &defaultSessionConn.config.UserName, + Password: &defaultSessionConn.config.Password} fmt.Println(req) - resp, err := session.client.OpenSession(context.Background(), &req) + resp, err := defaultSessionConn.client.OpenSession(context.Background(), &req) if err != nil { return err } - session.sessionId = resp.GetSessionId() - session.requestStatementId, err = session.client.RequestStatementId(context.Background(), session.sessionId) + defaultSessionConn.sessionId = resp.GetSessionId() + defaultSessionConn.requestStatementId, err = defaultSessionConn.client.RequestStatementId(context.Background(), defaultSessionConn.sessionId) if err != nil { return err } - session.SetTimeZone(session.config.TimeZone) - session.config.TimeZone, err = session.GetTimeZone() + defaultSessionConn.SetTimeZone(defaultSessionConn.config.TimeZone) + defaultSessionConn.config.TimeZone, err = defaultSessionConn.GetTimeZone() return err }
