This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch opt_go_client in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
commit e6a37915a50abdfedc1c8bb24b1f25908e61f6cb Author: HTHou <[email protected]> AuthorDate: Mon Nov 18 21:09:13 2024 +0800 format --- client/session.go | 1819 +++++++++++++++++++++++++++-------------------------- 1 file changed, 910 insertions(+), 909 deletions(-) diff --git a/client/session.go b/client/session.go index 26ebff0..36d0792 100644 --- a/client/session.go +++ b/client/session.go @@ -20,166 +20,167 @@ package client import ( - "bytes" - "container/list" - "context" - "encoding/binary" - "errors" - "fmt" - "github.com/apache/iotdb-client-go/common" - "log" - "net" - "reflect" - "sort" - "strings" - "time" - - "github.com/apache/iotdb-client-go/rpc" - "github.com/apache/thrift/lib/go/thrift" + "bytes" + "container/list" + "context" + "encoding/binary" + "errors" + "fmt" + "github.com/apache/iotdb-client-go/common" + "log" + "net" + "reflect" + "sort" + "strings" + "time" + + "github.com/apache/iotdb-client-go/rpc" + "github.com/apache/thrift/lib/go/thrift" ) const ( - DefaultTimeZone = "Asia/Shanghai" - DefaultFetchSize = 1024 - DefaultConnectRetryMax = 3 + DefaultTimeZone = "Asia/Shanghai" + DefaultFetchSize = 1024 + DefaultConnectRetryMax = 3 ) var errLength = errors.New("deviceIds, times, measurementsList and valuesList's size should be equal") type Config struct { - Host string - Port string - UserName string - Password string - FetchSize int32 - TimeZone string - ConnectRetryMax int - enableRPCCompression bool + Host string + Port string + UserName string + Password string + FetchSize int32 + TimeZone string + ConnectRetryMax int + enableRPCCompression bool } type Session struct { - config *Config - client *rpc.IClientRPCServiceClient - sessionId int64 - trans thrift.TTransport - requestStatementId int64 + config *Config + client *rpc.IClientRPCServiceClient + sessionId int64 + trans thrift.TTransport + requestStatementId int64 } type endPoint struct { - Host string - Port string + Host string + Port string } var endPointList = list.New() func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) error { - if s.config.FetchSize <= 0 { - s.config.FetchSize = DefaultFetchSize - } - if s.config.TimeZone == "" { - s.config.TimeZone = DefaultTimeZone - } - - if s.config.ConnectRetryMax <= 0 { - s.config.ConnectRetryMax = DefaultConnectRetryMax - } - - var protocolFactory thrift.TProtocolFactory - var err error - - // in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it returns one. - s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, s.config.Port), &thrift.TConfiguration{ - ConnectTimeout: time.Duration(connectionTimeoutInMs) * time.Millisecond, // Use 0 for no timeout - }) - // s.trans = thrift.NewTFramedTransport(s.trans) // deprecated - var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} - s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf) - if !s.trans.IsOpen() { - err = s.trans.Open() - if err != nil { - return err - } - } - s.config.enableRPCCompression = enableRPCCompression - protocolFactory = getProtocolFactory(enableRPCCompression) - iprot := protocolFactory.GetProtocol(s.trans) - oprot := protocolFactory.GetProtocol(s.trans) - s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot)) - req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName, - Password: &s.config.Password} - resp, err := s.client.OpenSession(context.Background(), &req) - if err != nil { - return err - } - s.sessionId = resp.GetSessionId() - s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId) - return err + if s.config.FetchSize <= 0 { + s.config.FetchSize = DefaultFetchSize + } + if s.config.TimeZone == "" { + s.config.TimeZone = DefaultTimeZone + } + + if s.config.ConnectRetryMax <= 0 { + s.config.ConnectRetryMax = DefaultConnectRetryMax + } + + var protocolFactory thrift.TProtocolFactory + var err error + + // in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it returns one. + s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, s.config.Port), &thrift.TConfiguration{ + ConnectTimeout: time.Duration(connectionTimeoutInMs) * time.Millisecond, // Use 0 for no timeout + }) + // s.trans = thrift.NewTFramedTransport(s.trans) // deprecated + var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} + s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf) + if !s.trans.IsOpen() { + err = s.trans.Open() + if err != nil { + return err + } + } + s.config.enableRPCCompression = enableRPCCompression + protocolFactory = getProtocolFactory(enableRPCCompression) + iprot := protocolFactory.GetProtocol(s.trans) + oprot := protocolFactory.GetProtocol(s.trans) + s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot)) + req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName, + Password: &s.config.Password} + resp, err := s.client.OpenSession(context.Background(), &req) + if err != nil { + return err + } + s.sessionId = resp.GetSessionId() + s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId) + return err } type ClusterConfig struct { - NodeUrls []string //ip:port - UserName string - Password string - FetchSize int32 - TimeZone string - ConnectRetryMax int + NodeUrls []string //ip:port + UserName string + Password string + FetchSize int32 + TimeZone string + ConnectRetryMax int } type ClusterSession struct { - config *ClusterConfig - client *rpc.IClientRPCServiceClient - sessionId int64 - trans thrift.TTransport - requestStatementId int64 + config *ClusterConfig + client *rpc.IClientRPCServiceClient + sessionId int64 + trans thrift.TTransport + requestStatementId int64 } func (s *Session) OpenCluster(enableRPCCompression bool) error { - if s.config.FetchSize <= 0 { - s.config.FetchSize = DefaultFetchSize - } - if s.config.TimeZone == "" { - s.config.TimeZone = DefaultTimeZone - } - - if s.config.ConnectRetryMax <= 0 { - s.config.ConnectRetryMax = DefaultConnectRetryMax - } - - var protocolFactory thrift.TProtocolFactory - var err error - - s.config.enableRPCCompression = enableRPCCompression - protocolFactory = getProtocolFactory(enableRPCCompression) - iprot := protocolFactory.GetProtocol(s.trans) - oprot := protocolFactory.GetProtocol(s.trans) - s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot)) - req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName, - Password: &s.config.Password} - - resp, err := s.client.OpenSession(context.Background(), &req) - if err != nil { - return err - } - s.sessionId = resp.GetSessionId() - s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId) - return err + if s.config.FetchSize <= 0 { + s.config.FetchSize = DefaultFetchSize + } + if s.config.TimeZone == "" { + s.config.TimeZone = DefaultTimeZone + } + + if s.config.ConnectRetryMax <= 0 { + s.config.ConnectRetryMax = DefaultConnectRetryMax + } + + var protocolFactory thrift.TProtocolFactory + var err error + + s.config.enableRPCCompression = enableRPCCompression + protocolFactory = getProtocolFactory(enableRPCCompression) + iprot := protocolFactory.GetProtocol(s.trans) + oprot := protocolFactory.GetProtocol(s.trans) + s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot)) + req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName, + Password: &s.config.Password} + + resp, err := s.client.OpenSession(context.Background(), &req) + if err != nil { + return err + } + s.sessionId = resp.GetSessionId() + s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId) + return err } func getProtocolFactory(enableRPCCompression bool) thrift.TProtocolFactory { - if enableRPCCompression { - return thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{}) - } - return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{}) + if enableRPCCompression { + return thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{}) + } else { + return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{}) + } } func (s *Session) Close() (r *common.TSStatus, err error) { - req := rpc.NewTSCloseSessionReq() - req.SessionId = s.sessionId - _, err = s.client.CloseSession(context.Background(), req) - if err != nil { - return nil, err - } - return nil, s.trans.Close() + req := rpc.NewTSCloseSessionReq() + req.SessionId = s.sessionId + _, err = s.client.CloseSession(context.Background(), req) + if err != nil { + return nil, err + } + return nil, s.trans.Close() } /* @@ -190,13 +191,13 @@ func (s *Session) Close() (r *common.TSStatus, err error) { *error: correctness of operation */ func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus, err error) { - r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) - if err != nil && r == nil { - if s.reconnect() { - r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) - } - } - return r, err + r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) + if err != nil && r == nil { + if s.reconnect() { + r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId) + } + } + return r, err } /* @@ -207,13 +208,13 @@ func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus, er *error: correctness of operation */ func (s *Session) DeleteStorageGroup(storageGroupId string) (r *common.TSStatus, err error) { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) - if err != nil && r == nil { - if s.reconnect() { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) - } - } - return r, err + r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) + if err != nil && r == nil { + if s.reconnect() { + r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId}) + } + } + return r, err } /* @@ -224,13 +225,13 @@ func (s *Session) DeleteStorageGroup(storageGroupId string) (r *common.TSStatus, *error: correctness of operation */ func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *common.TSStatus, err error) { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) - if err != nil && r == nil { - if s.reconnect() { - r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) - } - } - return r, err + r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) + if err != nil && r == nil { + if s.reconnect() { + r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds) + } + } + return r, err } /* @@ -244,16 +245,16 @@ func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *common.TSSt *error: correctness of operation */ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r *common.TSStatus, err error) { - request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding), - Compressor: int32(compressor), Attributes: attributes, Tags: tags} - status, err := s.client.CreateTimeseries(context.Background(), &request) - if err != nil && status == nil { - if s.reconnect() { - request.SessionId = s.sessionId - status, err = s.client.CreateTimeseries(context.Background(), &request) - } - } - return status, err + request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding), + Compressor: int32(compressor), Attributes: attributes, Tags: tags} + status, err := s.client.CreateTimeseries(context.Background(), &request) + if err != nil && status == nil { + if s.reconnect() { + request.SessionId = s.sessionId + status, err = s.client.CreateTimeseries(context.Background(), &request) + } + } + return status, err } /* @@ -269,38 +270,38 @@ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TS *error: correctness of operation */ func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r *common.TSStatus, err error) { - destTypes := make([]int32, len(dataTypes)) - for i, t := range dataTypes { - destTypes[i] = int32(t) - } - - destEncodings := make([]int32, len(encodings)) - for i, e := range encodings { - destEncodings[i] = int32(e) - } - - destCompressions := make([]int32, len(compressors)) - for i, e := range compressors { - destCompressions[i] = int32(e) - } - - request := rpc.TSCreateAlignedTimeseriesReq{ - SessionId: s.sessionId, - PrefixPath: prefixPath, - Measurements: measurements, - DataTypes: destTypes, - Encodings: destEncodings, - Compressors: destCompressions, - MeasurementAlias: measurementAlias, - } - status, err := s.client.CreateAlignedTimeseries(context.Background(), &request) - if err != nil && status == nil { - if s.reconnect() { - request.SessionId = s.sessionId - status, err = s.client.CreateAlignedTimeseries(context.Background(), &request) - } - } - return status, err + destTypes := make([]int32, len(dataTypes)) + for i, t := range dataTypes { + destTypes[i] = int32(t) + } + + destEncodings := make([]int32, len(encodings)) + for i, e := range encodings { + destEncodings[i] = int32(e) + } + + destCompressions := make([]int32, len(compressors)) + for i, e := range compressors { + destCompressions[i] = int32(e) + } + + request := rpc.TSCreateAlignedTimeseriesReq{ + SessionId: s.sessionId, + PrefixPath: prefixPath, + Measurements: measurements, + DataTypes: destTypes, + Encodings: destEncodings, + Compressors: destCompressions, + MeasurementAlias: measurementAlias, + } + status, err := s.client.CreateAlignedTimeseries(context.Background(), &request) + if err != nil && status == nil { + if s.reconnect() { + request.SessionId = s.sessionId + status, err = s.client.CreateAlignedTimeseries(context.Background(), &request) + } + } + return status, err } /* @@ -314,33 +315,33 @@ func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []stri *error: correctness of operation */ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r *common.TSStatus, err error) { - destTypes := make([]int32, len(dataTypes)) - for i, t := range dataTypes { - destTypes[i] = int32(t) - } - - destEncodings := make([]int32, len(encodings)) - for i, e := range encodings { - destEncodings[i] = int32(e) - } - - destCompressions := make([]int32, len(compressors)) - for i, e := range compressors { - destCompressions[i] = int32(e) - } - - request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, Paths: paths, DataTypes: destTypes, - Encodings: destEncodings, Compressors: destCompressions} - r, err = s.client.CreateMultiTimeseries(context.Background(), &request) - - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.CreateMultiTimeseries(context.Background(), &request) - } - } - - return r, err + destTypes := make([]int32, len(dataTypes)) + for i, t := range dataTypes { + destTypes[i] = int32(t) + } + + destEncodings := make([]int32, len(encodings)) + for i, e := range encodings { + destEncodings[i] = int32(e) + } + + destCompressions := make([]int32, len(compressors)) + for i, e := range compressors { + destCompressions[i] = int32(e) + } + + request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, Paths: paths, DataTypes: destTypes, + Encodings: destEncodings, Compressors: destCompressions} + r, err = s.client.CreateMultiTimeseries(context.Background(), &request) + + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.CreateMultiTimeseries(context.Background(), &request) + } + } + + return r, err } /* @@ -351,13 +352,13 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, *error: correctness of operation */ func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err error) { - r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) - if err != nil && r == nil { - if s.reconnect() { - r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) - } - } - return r, err + r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) + if err != nil && r == nil { + if s.reconnect() { + r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths) + } + } + return r, err } /* @@ -370,15 +371,15 @@ func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err erro *error: correctness of operation */ func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *common.TSStatus, err error) { - request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime} - r, err = s.client.DeleteData(context.Background(), &request) - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.DeleteData(context.Background(), &request) - } - } - return r, err + request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime} + r, err = s.client.DeleteData(context.Background(), &request) + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.DeleteData(context.Background(), &request) + } + } + return r, err } /* @@ -392,224 +393,224 @@ func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *error: correctness of operation */ func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r *common.TSStatus, err error) { - request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, PrefixPath: deviceId, Measurements: measurements, - Values: values, Timestamp: timestamp} - r, err = s.client.InsertStringRecord(context.Background(), &request) - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertStringRecord(context.Background(), &request) - } - } - return r, err + request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, PrefixPath: deviceId, Measurements: measurements, + Values: values, Timestamp: timestamp} + r, err = s.client.InsertStringRecord(context.Background(), &request) + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertStringRecord(context.Background(), &request) + } + } + return r, err } func (s *Session) GetTimeZone() (string, error) { - resp, err := s.client.GetTimeZone(context.Background(), s.sessionId) - if err != nil { - return DefaultTimeZone, err - } - return resp.TimeZone, nil + resp, err := s.client.GetTimeZone(context.Background(), s.sessionId) + if err != nil { + return DefaultTimeZone, err + } + return resp.TimeZone, nil } func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error) { - request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone} - r, err = s.client.SetTimeZone(context.Background(), &request) - s.config.TimeZone = timeZone - return r, err + request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone} + r, err = s.client.SetTimeZone(context.Background(), &request) + s.config.TimeZone = timeZone + return r, err } func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) { - request := rpc.TSExecuteStatementReq{ - SessionId: s.sessionId, - Statement: sql, - StatementId: s.requestStatementId, - FetchSize: &s.config.FetchSize, - } - resp, err := s.client.ExecuteStatement(context.Background(), &request) - - if err != nil && resp == nil { - if s.reconnect() { - request.SessionId = s.sessionId - request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteStatement(context.Background(), &request) - } - } - - return s.genDataSet(sql, resp), err + request := rpc.TSExecuteStatementReq{ + SessionId: s.sessionId, + Statement: sql, + StatementId: s.requestStatementId, + FetchSize: &s.config.FetchSize, + } + resp, err := s.client.ExecuteStatement(context.Background(), &request) + + if err != nil && resp == nil { + if s.reconnect() { + request.SessionId = s.sessionId + request.StatementId = s.requestStatementId + resp, err = s.client.ExecuteStatement(context.Background(), &request) + } + } + + return s.genDataSet(sql, resp), err } func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) { - request := rpc.TSExecuteStatementReq{ - SessionId: s.sessionId, - Statement: sql, - StatementId: s.requestStatementId, - FetchSize: &s.config.FetchSize, - } - resp, err := s.client.ExecuteStatement(context.Background(), &request) - - if err != nil && resp == nil { - if s.reconnect() { - request.SessionId = s.sessionId - request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteStatement(context.Background(), &request) - } - } - - return resp.Status, err + request := rpc.TSExecuteStatementReq{ + SessionId: s.sessionId, + Statement: sql, + StatementId: s.requestStatementId, + FetchSize: &s.config.FetchSize, + } + resp, err := s.client.ExecuteStatement(context.Background(), &request) + + if err != nil && resp == nil { + if s.reconnect() { + request.SessionId = s.sessionId + request.StatementId = s.requestStatementId + resp, err = s.client.ExecuteStatement(context.Background(), &request) + } + } + + return resp.Status, err } func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionDataSet, error) { - request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId, - FetchSize: &s.config.FetchSize, Timeout: timeoutMs} - if resp, err := s.client.ExecuteQueryStatement(context.Background(), &request); err == nil { - if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err - } else { - return nil, statusErr - } - } else { - if s.reconnect() { - request.SessionId = s.sessionId - request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteQueryStatement(context.Background(), &request) - if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err - } else { - return nil, statusErr - } - } - return nil, err - } + request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId, + FetchSize: &s.config.FetchSize, Timeout: timeoutMs} + if resp, err := s.client.ExecuteQueryStatement(context.Background(), &request); err == nil { + if statusErr := VerifySuccess(resp.Status); statusErr == nil { + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + } else { + return nil, statusErr + } + } else { + if s.reconnect() { + request.SessionId = s.sessionId + request.StatementId = s.requestStatementId + resp, err = s.client.ExecuteQueryStatement(context.Background(), &request) + if statusErr := VerifySuccess(resp.Status); statusErr == nil { + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + } else { + return nil, statusErr + } + } + return nil, err + } } func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, - startTime *int64, endTime *int64, interval *int64, - timeoutMs *int64) (*SessionDataSet, error) { - - request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths, - Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs} - if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil { - if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err - } else { - return nil, statusErr - } - } else { - if s.reconnect() { - request.SessionId = s.sessionId - resp, err = s.client.ExecuteAggregationQuery(context.Background(), &request) - if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err - } else { - return nil, statusErr - } - } - return nil, err - } + startTime *int64, endTime *int64, interval *int64, + timeoutMs *int64) (*SessionDataSet, error) { + + request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths, + Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs} + if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil { + if statusErr := VerifySuccess(resp.Status); statusErr == nil { + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + } else { + return nil, statusErr + } + } else { + if s.reconnect() { + request.SessionId = s.sessionId + resp, err = s.client.ExecuteAggregationQuery(context.Background(), &request) + if statusErr := VerifySuccess(resp.Status); statusErr == nil { + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + } else { + return nil, statusErr + } + } + return nil, err + } } func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregations []common.TAggregationType, - startTime *int64, endTime *int64, interval *int64, - timeoutMs *int64, legalNodes *bool) (*SessionDataSet, error) { - - request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths, - Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, - Timeout: timeoutMs, LegalPathNodes: legalNodes} - if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil { - if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err - } else { - return nil, statusErr - } - } else { - if s.reconnect() { - request.SessionId = s.sessionId - resp, err = s.client.ExecuteAggregationQuery(context.Background(), &request) - if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err - } else { - return nil, statusErr - } - } - return nil, err - } + startTime *int64, endTime *int64, interval *int64, + timeoutMs *int64, legalNodes *bool) (*SessionDataSet, error) { + + request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths, + Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, + Timeout: timeoutMs, LegalPathNodes: legalNodes} + if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil { + if statusErr := VerifySuccess(resp.Status); statusErr == nil { + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + } else { + return nil, statusErr + } + } else { + if s.reconnect() { + request.SessionId = s.sessionId + resp, err = s.client.ExecuteAggregationQuery(context.Background(), &request) + if statusErr := VerifySuccess(resp.Status); statusErr == nil { + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + } else { + return nil, statusErr + } + } + return nil, err + } } func (s *Session) genTSInsertRecordReq(deviceId string, time int64, - measurements []string, - types []TSDataType, - values []interface{}, - isAligned bool) (*rpc.TSInsertRecordReq, error) { - request := &rpc.TSInsertRecordReq{} - request.SessionId = s.sessionId - request.PrefixPath = deviceId - request.Timestamp = time - request.Measurements = measurements - request.IsAligned = &isAligned - if bys, err := valuesToBytes(types, values); err == nil { - request.Values = bys - } else { - return nil, err - } - return request, nil + measurements []string, + types []TSDataType, + values []interface{}, + isAligned bool) (*rpc.TSInsertRecordReq, error) { + request := &rpc.TSInsertRecordReq{} + request.SessionId = s.sessionId + request.PrefixPath = deviceId + request.Timestamp = time + request.Measurements = measurements + request.IsAligned = &isAligned + if bys, err := valuesToBytes(types, values); err == nil { + request.Values = bys + } else { + return nil, err + } + return request, nil } func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *common.TSStatus, err error) { - request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, false) - if err != nil { - return nil, err - } - r, err = s.client.InsertRecord(context.Background(), request) - - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertRecord(context.Background(), request) - } - } - - return r, err + request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, false) + if err != nil { + return nil, err + } + r, err = s.client.InsertRecord(context.Background(), request) + + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertRecord(context.Background(), request) + } + } + + return r, err } func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *common.TSStatus, err error) { - request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, true) - if err != nil { - return nil, err - } - r, err = s.client.InsertRecord(context.Background(), request) - - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertRecord(context.Background(), request) - } - } - - return r, err + request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, true) + if err != nil { + return nil, err + } + r, err = s.client.InsertRecord(context.Background(), request) + + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertRecord(context.Background(), request) + } + } + + return r, err } type deviceData struct { - timestamps []int64 - measurementsSlice [][]string - dataTypesSlice [][]TSDataType - valuesSlice [][]interface{} - isAligned bool + timestamps []int64 + measurementsSlice [][]string + dataTypesSlice [][]TSDataType + valuesSlice [][]interface{} + isAligned bool } func (d *deviceData) Len() int { - return len(d.timestamps) + return len(d.timestamps) } func (d *deviceData) Less(i, j int) bool { - return d.timestamps[i] < d.timestamps[j] + return d.timestamps[i] < d.timestamps[j] } func (d *deviceData) Swap(i, j int) { - d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i] - d.measurementsSlice[i], d.measurementsSlice[j] = d.measurementsSlice[j], d.measurementsSlice[i] - d.dataTypesSlice[i], d.dataTypesSlice[j] = d.dataTypesSlice[j], d.dataTypesSlice[i] - d.valuesSlice[i], d.valuesSlice[j] = d.valuesSlice[j], d.valuesSlice[i] + d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i] + d.measurementsSlice[i], d.measurementsSlice[j] = d.measurementsSlice[j], d.measurementsSlice[i] + d.dataTypesSlice[i], d.dataTypesSlice[j] = d.dataTypesSlice[j], d.dataTypesSlice[i] + d.valuesSlice[i], d.valuesSlice[j] = d.valuesSlice[j], d.valuesSlice[i] } // InsertRecordsOfOneDevice Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc @@ -617,88 +618,88 @@ func (d *deviceData) Swap(i, j int) { // your performance, please see insertTablet method // Each row is independent, which could have different deviceId, time, number of measurements func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) { - length := len(timestamps) - if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length { - return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") - } - - if !sorted { - sort.Sort(&deviceData{ - timestamps: timestamps, - measurementsSlice: measurementsSlice, - dataTypesSlice: dataTypesSlice, - valuesSlice: valuesSlice, - }) - } - - valuesList := make([][]byte, length) - for i := 0; i < length; i++ { - if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil { - return nil, err - } - } - - request := &rpc.TSInsertRecordsOfOneDeviceReq{ - SessionId: s.sessionId, - PrefixPath: deviceId, - Timestamps: timestamps, - MeasurementsList: measurementsSlice, - ValuesList: valuesList, - } - - r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) - - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) - } - } - - return r, err + length := len(timestamps) + if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length { + return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") + } + + if !sorted { + sort.Sort(&deviceData{ + timestamps: timestamps, + measurementsSlice: measurementsSlice, + dataTypesSlice: dataTypesSlice, + valuesSlice: valuesSlice, + }) + } + + valuesList := make([][]byte, length) + for i := 0; i < length; i++ { + if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil { + return nil, err + } + } + + request := &rpc.TSInsertRecordsOfOneDeviceReq{ + SessionId: s.sessionId, + PrefixPath: deviceId, + Timestamps: timestamps, + MeasurementsList: measurementsSlice, + ValuesList: valuesList, + } + + r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) + + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) + } + } + + return r, err } func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) { - length := len(timestamps) - if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length { - return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") - } - - if !sorted { - sort.Sort(&deviceData{ - timestamps: timestamps, - measurementsSlice: measurementsSlice, - dataTypesSlice: dataTypesSlice, - valuesSlice: valuesSlice, - }) - } - - valuesList := make([][]byte, length) - for i := 0; i < length; i++ { - if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil { - return nil, err - } - } - var isAligned = true - request := &rpc.TSInsertRecordsOfOneDeviceReq{ - SessionId: s.sessionId, - PrefixPath: deviceId, - Timestamps: timestamps, - MeasurementsList: measurementsSlice, - ValuesList: valuesList, - IsAligned: &isAligned, - } - - r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) - - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) - } - } - - return r, err + length := len(timestamps) + if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length { + return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal") + } + + if !sorted { + sort.Sort(&deviceData{ + timestamps: timestamps, + measurementsSlice: measurementsSlice, + dataTypesSlice: dataTypesSlice, + valuesSlice: valuesSlice, + }) + } + + valuesList := make([][]byte, length) + for i := 0; i < length; i++ { + if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil { + return nil, err + } + } + var isAligned = true + request := &rpc.TSInsertRecordsOfOneDeviceReq{ + SessionId: s.sessionId, + PrefixPath: deviceId, + Timestamps: timestamps, + MeasurementsList: measurementsSlice, + ValuesList: valuesList, + IsAligned: &isAligned, + } + + r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) + + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request) + } + } + + return r, err } /* @@ -714,37 +715,37 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps [] * */ func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, - timestamps []int64) (r *common.TSStatus, err error) { - request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false) - if err != nil { - return nil, err - } else { - r, err = s.client.InsertRecords(context.Background(), request) - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertRecords(context.Background(), request) - } - } - return r, err - } + timestamps []int64) (r *common.TSStatus, err error) { + request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false) + if err != nil { + return nil, err + } else { + r, err = s.client.InsertRecords(context.Background(), request) + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertRecords(context.Background(), request) + } + } + return r, err + } } func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, - timestamps []int64) (r *common.TSStatus, err error) { - request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true) - if err != nil { - return nil, err - } else { - r, err = s.client.InsertRecords(context.Background(), request) - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertRecords(context.Background(), request) - } - } - return r, err - } + timestamps []int64) (r *common.TSStatus, err error) { + request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true) + if err != nil { + return nil, err + } else { + r, err = s.client.InsertRecords(context.Background(), request) + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertRecords(context.Background(), request) + } + } + return r, err + } } /* @@ -753,450 +754,450 @@ func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]stri *tablets: []*client.Tablet, list of tablets */ func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r *common.TSStatus, err error) { - if !sorted { - for _, t := range tablets { - if err := t.Sort(); err != nil { - return nil, err - } - } - } - request, err := s.genInsertTabletsReq(tablets, false) - if err != nil { - return nil, err - } - r, err = s.client.InsertTablets(context.Background(), request) - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertTablets(context.Background(), request) - } - } - return r, err + if !sorted { + for _, t := range tablets { + if err := t.Sort(); err != nil { + return nil, err + } + } + } + request, err := s.genInsertTabletsReq(tablets, false) + if err != nil { + return nil, err + } + r, err = s.client.InsertTablets(context.Background(), request) + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertTablets(context.Background(), request) + } + } + return r, err } func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r *common.TSStatus, err error) { - if !sorted { - for _, t := range tablets { - if err := t.Sort(); err != nil { - return nil, err - } - } - } - request, err := s.genInsertTabletsReq(tablets, true) - if err != nil { - return nil, err - } - r, err = s.client.InsertTablets(context.Background(), request) - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertTablets(context.Background(), request) - } - } - return r, err + if !sorted { + for _, t := range tablets { + if err := t.Sort(); err != nil { + return nil, err + } + } + } + request, err := s.genInsertTabletsReq(tablets, true) + if err != nil { + return nil, err + } + r, err = s.client.InsertTablets(context.Background(), request) + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertTablets(context.Background(), request) + } + } + return r, err } func (s *Session) ExecuteBatchStatement(inserts []string) (r *common.TSStatus, err error) { - request := rpc.TSExecuteBatchStatementReq{ - SessionId: s.sessionId, - Statements: inserts, - } - r, err = s.client.ExecuteBatchStatement(context.Background(), &request) - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.ExecuteBatchStatement(context.Background(), &request) - } - } - return r, err + request := rpc.TSExecuteBatchStatementReq{ + SessionId: s.sessionId, + Statements: inserts, + } + r, err = s.client.ExecuteBatchStatement(context.Background(), &request) + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.ExecuteBatchStatement(context.Background(), &request) + } + } + return r, err } func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) { - request := rpc.TSRawDataQueryReq{ - SessionId: s.sessionId, - Paths: paths, - FetchSize: &s.config.FetchSize, - StartTime: startTime, - EndTime: endTime, - StatementId: s.requestStatementId, - } - resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request) - - if err != nil && resp == nil { - if s.reconnect() { - request.SessionId = s.sessionId - request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteRawDataQuery(context.Background(), &request) - } - } - - return s.genDataSet("", resp), err + request := rpc.TSRawDataQueryReq{ + SessionId: s.sessionId, + Paths: paths, + FetchSize: &s.config.FetchSize, + StartTime: startTime, + EndTime: endTime, + StatementId: s.requestStatementId, + } + resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request) + + if err != nil && resp == nil { + if s.reconnect() { + request.SessionId = s.sessionId + request.StatementId = s.requestStatementId + resp, err = s.client.ExecuteRawDataQuery(context.Background(), &request) + } + } + + return s.genDataSet("", resp), err } func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) { - request := rpc.TSExecuteStatementReq{ - SessionId: s.sessionId, - Statement: sql, - StatementId: s.requestStatementId, - FetchSize: &s.config.FetchSize, - } - resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request) - - if err != nil && resp == nil { - if s.reconnect() { - request.SessionId = s.sessionId - request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteUpdateStatement(context.Background(), &request) - } - } - - return s.genDataSet(sql, resp), err + request := rpc.TSExecuteStatementReq{ + SessionId: s.sessionId, + Statement: sql, + StatementId: s.requestStatementId, + FetchSize: &s.config.FetchSize, + } + resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request) + + if err != nil && resp == nil { + if s.reconnect() { + request.SessionId = s.sessionId + request.StatementId = s.requestStatementId + resp, err = s.client.ExecuteUpdateStatement(context.Background(), &request) + } + } + + return s.genDataSet(sql, resp), err } func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) *SessionDataSet { - var queryId int64 - if resp.QueryId == nil { - queryId = 0 - } else { - queryId = *resp.QueryId - } - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, - queryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil) + var queryId int64 + if resp.QueryId == nil { + queryId = 0 + } else { + queryId = *resp.QueryId + } + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, + queryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil) } func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.TSInsertTabletsReq, error) { - var ( - length = len(tablets) - deviceIds = make([]string, length) - measurementsList = make([][]string, length) - valuesList = make([][]byte, length) - timestampsList = make([][]byte, length) - typesList = make([][]int32, length) - sizeList = make([]int32, length) - ) - for index, tablet := range tablets { - deviceIds[index] = tablet.deviceId - measurementsList[index] = tablet.GetMeasurements() - - values, err := tablet.getValuesBytes() - if err != nil { - return nil, err - } - - valuesList[index] = values - timestampsList[index] = tablet.GetTimestampBytes() - typesList[index] = tablet.getDataTypes() - sizeList[index] = int32(tablet.RowSize) - } - request := rpc.TSInsertTabletsReq{ - SessionId: s.sessionId, - PrefixPaths: deviceIds, - TypesList: typesList, - MeasurementsList: measurementsList, - ValuesList: valuesList, - TimestampsList: timestampsList, - SizeList: sizeList, - IsAligned: &isAligned, - } - return &request, nil + var ( + length = len(tablets) + deviceIds = make([]string, length) + measurementsList = make([][]string, length) + valuesList = make([][]byte, length) + timestampsList = make([][]byte, length) + typesList = make([][]int32, length) + sizeList = make([]int32, length) + ) + for index, tablet := range tablets { + deviceIds[index] = tablet.deviceId + measurementsList[index] = tablet.GetMeasurements() + + values, err := tablet.getValuesBytes() + if err != nil { + return nil, err + } + + valuesList[index] = values + timestampsList[index] = tablet.GetTimestampBytes() + typesList[index] = tablet.getDataTypes() + sizeList[index] = int32(tablet.RowSize) + } + request := rpc.TSInsertTabletsReq{ + SessionId: s.sessionId, + PrefixPaths: deviceIds, + TypesList: typesList, + MeasurementsList: measurementsList, + ValuesList: valuesList, + TimestampsList: timestampsList, + SizeList: sizeList, + IsAligned: &isAligned, + } + return &request, nil } func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, - timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) { - length := len(deviceIds) - if length != len(timestamps) || length != len(measurements) || length != len(values) { - return nil, errLength - } - request := rpc.TSInsertRecordsReq{ - SessionId: s.sessionId, - PrefixPaths: deviceIds, - MeasurementsList: measurements, - Timestamps: timestamps, - IsAligned: &isAligned, - } - v := make([][]byte, length) - for i := 0; i < len(measurements); i++ { - if bys, err := valuesToBytes(dataTypes[i], values[i]); err == nil { - v[i] = bys - } else { - return nil, err - } - } - request.ValuesList = v - return &request, nil + timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) { + length := len(deviceIds) + if length != len(timestamps) || length != len(measurements) || length != len(values) { + return nil, errLength + } + request := rpc.TSInsertRecordsReq{ + SessionId: s.sessionId, + PrefixPaths: deviceIds, + MeasurementsList: measurements, + Timestamps: timestamps, + IsAligned: &isAligned, + } + v := make([][]byte, length) + for i := 0; i < len(measurements); i++ { + if bys, err := valuesToBytes(dataTypes[i], values[i]); err == nil { + v[i] = bys + } else { + return nil, err + } + } + request.ValuesList = v + return &request, nil } func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) { - buff := &bytes.Buffer{} - for i, t := range dataTypes { - binary.Write(buff, binary.BigEndian, byte(t)) - v := values[i] - if v == nil { - return nil, fmt.Errorf("values[%d] can't be nil", i) - } - - switch t { - case BOOLEAN: - switch v.(type) { - case bool: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, reflect.TypeOf(v)) - } - case INT32: - switch v.(type) { - case int32: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v)) - } - case INT64, TIMESTAMP: - switch v.(type) { - case int64: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, v, reflect.TypeOf(v)) - } - case FLOAT: - switch v.(type) { - case float32: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, v, reflect.TypeOf(v)) - } - case DOUBLE: - switch v.(type) { - case float64: - binary.Write(buff, binary.BigEndian, v) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v)) - } - case TEXT, STRING: - switch s := v.(type) { - case string: - size := len(s) - binary.Write(buff, binary.BigEndian, int32(size)) - binary.Write(buff, binary.BigEndian, []byte(s)) - case []byte: - size := len(s) - binary.Write(buff, binary.BigEndian, int32(size)) - binary.Write(buff, binary.BigEndian, s) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be string or []byte", i, v, reflect.TypeOf(v)) - } - case BLOB: - switch s := v.(type) { - case []byte: - size := len(s) - binary.Write(buff, binary.BigEndian, int32(size)) - binary.Write(buff, binary.BigEndian, s) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be []byte", i, v, reflect.TypeOf(v)) - } - case DATE: - switch s := v.(type) { - case time.Time: - date, err := dateToInt32(s) - if err != nil { - return nil, err - } - binary.Write(buff, binary.BigEndian, date) - default: - return nil, fmt.Errorf("values[%d] %v(%v) must be time.Time", i, v, reflect.TypeOf(v)) - } - default: - return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, STRING)", i) - } - } - return buff.Bytes(), nil + buff := &bytes.Buffer{} + for i, t := range dataTypes { + binary.Write(buff, binary.BigEndian, byte(t)) + v := values[i] + if v == nil { + return nil, fmt.Errorf("values[%d] can't be nil", i) + } + + switch t { + case BOOLEAN: + switch v.(type) { + case bool: + binary.Write(buff, binary.BigEndian, v) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, reflect.TypeOf(v)) + } + case INT32: + switch v.(type) { + case int32: + binary.Write(buff, binary.BigEndian, v) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v)) + } + case INT64, TIMESTAMP: + switch v.(type) { + case int64: + binary.Write(buff, binary.BigEndian, v) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, v, reflect.TypeOf(v)) + } + case FLOAT: + switch v.(type) { + case float32: + binary.Write(buff, binary.BigEndian, v) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, v, reflect.TypeOf(v)) + } + case DOUBLE: + switch v.(type) { + case float64: + binary.Write(buff, binary.BigEndian, v) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v)) + } + case TEXT, STRING: + switch s := v.(type) { + case string: + size := len(s) + binary.Write(buff, binary.BigEndian, int32(size)) + binary.Write(buff, binary.BigEndian, []byte(s)) + case []byte: + size := len(s) + binary.Write(buff, binary.BigEndian, int32(size)) + binary.Write(buff, binary.BigEndian, s) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be string or []byte", i, v, reflect.TypeOf(v)) + } + case BLOB: + switch s := v.(type) { + case []byte: + size := len(s) + binary.Write(buff, binary.BigEndian, int32(size)) + binary.Write(buff, binary.BigEndian, s) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be []byte", i, v, reflect.TypeOf(v)) + } + case DATE: + switch s := v.(type) { + case time.Time: + date, err := dateToInt32(s) + if err != nil { + return nil, err + } + binary.Write(buff, binary.BigEndian, date) + default: + return nil, fmt.Errorf("values[%d] %v(%v) must be time.Time", i, v, reflect.TypeOf(v)) + } + default: + return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, STRING)", i) + } + } + return buff.Bytes(), nil } func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *common.TSStatus, err error) { - if !sorted { - if err := tablet.Sort(); err != nil { - return nil, err - } - } - request, err := s.genTSInsertTabletReq(tablet, false) - if err != nil { - return nil, err - } - - r, err = s.client.InsertTablet(context.Background(), request) - - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertTablet(context.Background(), request) - } - } - - return r, err + if !sorted { + if err := tablet.Sort(); err != nil { + return nil, err + } + } + request, err := s.genTSInsertTabletReq(tablet, false) + if err != nil { + return nil, err + } + + r, err = s.client.InsertTablet(context.Background(), request) + + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertTablet(context.Background(), request) + } + } + + return r, err } func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r *common.TSStatus, err error) { - if !sorted { - if err := tablet.Sort(); err != nil { - return nil, err - } - } - request, err := s.genTSInsertTabletReq(tablet, true) - if err != nil { - return nil, err - } - - r, err = s.client.InsertTablet(context.Background(), request) - - if err != nil && r == nil { - if s.reconnect() { - request.SessionId = s.sessionId - r, err = s.client.InsertTablet(context.Background(), request) - } - } - - return r, err + if !sorted { + if err := tablet.Sort(); err != nil { + return nil, err + } + } + request, err := s.genTSInsertTabletReq(tablet, true) + if err != nil { + return nil, err + } + + r, err = s.client.InsertTablet(context.Background(), request) + + if err != nil && r == nil { + if s.reconnect() { + request.SessionId = s.sessionId + r, err = s.client.InsertTablet(context.Background(), request) + } + } + + return r, err } func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool) (*rpc.TSInsertTabletReq, error) { - if values, err := tablet.getValuesBytes(); err == nil { - request := &rpc.TSInsertTabletReq{ - SessionId: s.sessionId, - PrefixPath: tablet.deviceId, - Measurements: tablet.GetMeasurements(), - Values: values, - Timestamps: tablet.GetTimestampBytes(), - Types: tablet.getDataTypes(), - Size: int32(tablet.RowSize), - IsAligned: &isAligned, - } - return request, nil - } else { - return nil, err - } + if values, err := tablet.getValuesBytes(); err == nil { + request := &rpc.TSInsertTabletReq{ + SessionId: s.sessionId, + PrefixPath: tablet.deviceId, + Measurements: tablet.GetMeasurements(), + Values: values, + Timestamps: tablet.GetTimestampBytes(), + Types: tablet.getDataTypes(), + Size: int32(tablet.RowSize), + IsAligned: &isAligned, + } + return request, nil + } else { + return nil, err + } } func (s *Session) GetSessionId() int64 { - return s.sessionId + return s.sessionId } func NewSession(config *Config) Session { - endPoint := endPoint{} - endPoint.Host = config.Host - endPoint.Port = config.Port - endPointList.PushBack(endPoint) - return Session{config: config} + endPoint := endPoint{} + endPoint.Host = config.Host + endPoint.Port = config.Port + endPointList.PushBack(endPoint) + return Session{config: config} } func NewClusterSession(clusterConfig *ClusterConfig) Session { - session := Session{} - node := endPoint{} - for i := 0; i < len(clusterConfig.NodeUrls); i++ { - node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0] - node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1] - endPointList.PushBack(node) - } - var err error - for e := endPointList.Front(); e != nil; e = e.Next() { - session.trans = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{ - ConnectTimeout: time.Duration(0), // Use 0 for no timeout - }) - // session.trans = thrift.NewTFramedTransport(session.trans) // deprecated - var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} - session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf) - if !session.trans.IsOpen() { - err = session.trans.Open() - if err != nil { - log.Println(err) - } else { - session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port, - clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax) - break - } - } - } - if !session.trans.IsOpen() { - log.Fatal("No Server Can Connect") - } - return session + session := Session{} + node := endPoint{} + for i := 0; i < len(clusterConfig.NodeUrls); i++ { + node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0] + node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1] + endPointList.PushBack(node) + } + var err error + for e := endPointList.Front(); e != nil; e = e.Next() { + session.trans = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{ + ConnectTimeout: time.Duration(0), // Use 0 for no timeout + }) + // session.trans = thrift.NewTFramedTransport(session.trans) // deprecated + var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} + session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf) + if !session.trans.IsOpen() { + err = session.trans.Open() + if err != nil { + log.Println(err) + } else { + session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port, + clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax) + break + } + } + } + if !session.trans.IsOpen() { + log.Fatal("No Server Can Connect") + } + return session } func (s *Session) initClusterConn(node endPoint) error { - var err error - - s.trans = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), &thrift.TConfiguration{ - ConnectTimeout: time.Duration(0), // Use 0 for no timeout - }) - if err == nil { - // s.trans = thrift.NewTFramedTransport(s.trans) // deprecated - var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} - s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf) - if !s.trans.IsOpen() { - err = s.trans.Open() - if err != nil { - return err - } - } - } - - if s.config.FetchSize < 1 { - s.config.FetchSize = DefaultFetchSize - } - - if s.config.TimeZone == "" { - s.config.TimeZone = DefaultTimeZone - } - - if s.config.ConnectRetryMax < 1 { - s.config.ConnectRetryMax = DefaultConnectRetryMax - } - - var protocolFactory thrift.TProtocolFactory - protocolFactory = getProtocolFactory(s.config.enableRPCCompression) - iprot := protocolFactory.GetProtocol(s.trans) - oprot := protocolFactory.GetProtocol(s.trans) - s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot)) - req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName, - Password: &s.config.Password} - - resp, err := s.client.OpenSession(context.Background(), &req) - if err != nil { - return err - } - s.sessionId = resp.GetSessionId() - s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId) - return err + var err error + + s.trans = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), &thrift.TConfiguration{ + ConnectTimeout: time.Duration(0), // Use 0 for no timeout + }) + if err == nil { + // s.trans = thrift.NewTFramedTransport(s.trans) // deprecated + var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE} + s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf) + if !s.trans.IsOpen() { + err = s.trans.Open() + if err != nil { + return err + } + } + } + + if s.config.FetchSize < 1 { + s.config.FetchSize = DefaultFetchSize + } + + if s.config.TimeZone == "" { + s.config.TimeZone = DefaultTimeZone + } + + if s.config.ConnectRetryMax < 1 { + s.config.ConnectRetryMax = DefaultConnectRetryMax + } + + var protocolFactory thrift.TProtocolFactory + protocolFactory = getProtocolFactory(s.config.enableRPCCompression) + iprot := protocolFactory.GetProtocol(s.trans) + oprot := protocolFactory.GetProtocol(s.trans) + s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot)) + req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName, + Password: &s.config.Password} + + resp, err := s.client.OpenSession(context.Background(), &req) + if err != nil { + return err + } + s.sessionId = resp.GetSessionId() + s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId) + return err } func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config { - return &Config{ - Host: host, - Port: port, - UserName: userName, - Password: passWord, - FetchSize: fetchSize, - TimeZone: timeZone, - ConnectRetryMax: connectRetryMax, - } + return &Config{ + Host: host, + Port: port, + UserName: userName, + Password: passWord, + FetchSize: fetchSize, + TimeZone: timeZone, + ConnectRetryMax: connectRetryMax, + } } func (s *Session) reconnect() bool { - var err error - var connectedSuccess = false - - for i := 0; i < s.config.ConnectRetryMax; i++ { - for e := endPointList.Front(); e != nil; e = e.Next() { - err = s.initClusterConn(e.Value.(endPoint)) - if err == nil { - connectedSuccess = true - break - } else { - log.Println("Connection refused:", e.Value.(endPoint)) - } - } - if connectedSuccess { - break - } - } - return connectedSuccess + var err error + var connectedSuccess = false + + for i := 0; i < s.config.ConnectRetryMax; i++ { + for e := endPointList.Front(); e != nil; e = e.Next() { + err = s.initClusterConn(e.Value.(endPoint)) + if err == nil { + connectedSuccess = true + break + } else { + log.Println("Connection refused:", e.Value.(endPoint)) + } + } + if connectedSuccess { + break + } + } + return connectedSuccess }
