This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ea2655  fix: fix EOF error when decoding columns with empty string or 
zero po… (#155)
2ea2655 is described below

commit 2ea2655e090dcefd12bf1a789a51c8df9a28fa24
Author: Zane <[email protected]>
AuthorDate: Thu Mar 12 19:00:51 2026 +0800

    fix: fix EOF error when decoding columns with empty string or zero po… 
(#155)
    
    * fix: fix EOF error when decoding columns with empty string or zero 
positionCount
    
    * Update column_decoder_test.go
    
    增加license header
    
    * fix: return error when resp is nil after reconnect
    
    * fix: GetCurrentRowTime returns time.Time to avoid precision ambiguity
---
 client/column_decoder.go      |  49 +++++++++--
 client/column_decoder_test.go | 200 ++++++++++++++++++++++++++++++++++++++++++
 client/session.go             |  52 +++++++----
 client/sessiondataset.go      |   4 +
 4 files changed, 284 insertions(+), 21 deletions(-)

diff --git a/client/column_decoder.go b/client/column_decoder.go
index e943541..0898c4f 100644
--- a/client/column_decoder.go
+++ b/client/column_decoder.go
@@ -95,6 +95,18 @@ func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader 
*bytes.Reader, dataTyp
        //    +---------------+-----------------+-------------+
        //    | byte          | list[byte]      | list[int32] |
        //    +---------------+-----------------+-------------+
+
+       if positionCount == 0 {
+               switch dataType {
+               case INT32, DATE:
+                       return NewIntColumn(0, 0, nil, []int32{})
+               case FLOAT:
+                       return NewFloatColumn(0, 0, nil, []float32{})
+               default:
+                       return nil, fmt.Errorf("invalid data type: %v", 
dataType)
+               }
+       }
+
        nullIndicators, err := deserializeNullIndicators(reader, positionCount)
        if err != nil {
                return nil, err
@@ -139,6 +151,18 @@ func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader 
*bytes.Reader, dataTyp
        //    +---------------+-----------------+-------------+
        //    | byte          | list[byte]      | list[int64] |
        //    +---------------+-----------------+-------------+
+
+       if positionCount == 0 {
+               switch dataType {
+               case INT64, TIMESTAMP:
+                       return NewLongColumn(0, 0, nil, []int64{})
+               case DOUBLE:
+                       return NewDoubleColumn(0, 0, nil, []float64{})
+               default:
+                       return nil, fmt.Errorf("invalid data type: %v", 
dataType)
+               }
+       }
+
        nullIndicators, err := deserializeNullIndicators(reader, positionCount)
        if err != nil {
                return nil, err
@@ -185,6 +209,11 @@ func (decoder *ByteArrayColumnDecoder) ReadColumn(reader 
*bytes.Reader, dataType
        if dataType != BOOLEAN {
                return nil, fmt.Errorf("invalid data type: %v", dataType)
        }
+
+       if positionCount == 0 {
+               return NewBooleanColumn(0, 0, nil, []bool{})
+       }
+
        nullIndicators, err := deserializeNullIndicators(reader, positionCount)
        if err != nil {
                return nil, err
@@ -218,6 +247,11 @@ func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader 
*bytes.Reader, dataTy
        if TEXT != dataType {
                return nil, fmt.Errorf("invalid data type: %v", dataType)
        }
+
+       if positionCount == 0 {
+               return NewBinaryColumn(0, 0, nil, []*Binary{})
+       }
+
        nullIndicators, err := deserializeNullIndicators(reader, positionCount)
        if err != nil {
                return nil, err
@@ -232,12 +266,17 @@ func (decoder *BinaryArrayColumnDecoder) 
ReadColumn(reader *bytes.Reader, dataTy
                if err != nil {
                        return nil, err
                }
-               value := make([]byte, length)
-               _, err = reader.Read(value)
-               if err != nil {
-                       return nil, err
+
+               if length == 0 {
+                       values[i] = NewBinary([]byte{})
+               } else {
+                       value := make([]byte, length)
+                       _, err = reader.Read(value)
+                       if err != nil {
+                               return nil, err
+                       }
+                       values[i] = NewBinary(value)
                }
-               values[i] = NewBinary(value)
        }
        return NewBinaryColumn(0, positionCount, nullIndicators, values)
 }
diff --git a/client/column_decoder_test.go b/client/column_decoder_test.go
new file mode 100644
index 0000000..dc5d433
--- /dev/null
+++ b/client/column_decoder_test.go
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+       "bytes"
+       "encoding/binary"
+       "testing"
+)
+
+func buildNullIndicatorBytes(nulls []bool) []byte {
+       var buf bytes.Buffer
+       hasNull := false
+       for _, n := range nulls {
+               if n {
+                       hasNull = true
+                       break
+               }
+       }
+       if !hasNull {
+               buf.WriteByte(0)
+               return buf.Bytes()
+       }
+       buf.WriteByte(1)
+       packed := make([]byte, (len(nulls)+7)/8)
+       for i, n := range nulls {
+               if n {
+                       packed[i/8] |= 0b10000000 >> (uint(i) % 8)
+               }
+       }
+       buf.Write(packed)
+       return buf.Bytes()
+}
+
+func TestBinaryArrayColumnDecoder_EmptyString(t *testing.T) {
+       var buf bytes.Buffer
+       buf.Write(buildNullIndicatorBytes([]bool{false}))
+       _ = binary.Write(&buf, binary.BigEndian, int32(0))
+
+       col, err := 
(&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 1)
+       if err != nil {
+               t.Fatalf("ReadColumn failed: %v", err)
+       }
+       if col.GetPositionCount() != 1 {
+               t.Fatalf("expected positionCount=1, got %d", 
col.GetPositionCount())
+       }
+       if col.IsNull(0) {
+               t.Fatal("row 0 should not be null")
+       }
+       val, err := col.GetBinary(0)
+       if err != nil {
+               t.Fatalf("GetBinary(0) failed: %v", err)
+       }
+       if len(val.values) != 0 {
+               t.Fatalf("expected empty string, got %q", string(val.values))
+       }
+}
+
+func TestBinaryArrayColumnDecoder_NullThenEmptyString(t *testing.T) {
+       var buf bytes.Buffer
+       buf.Write(buildNullIndicatorBytes([]bool{true, false}))
+       _ = binary.Write(&buf, binary.BigEndian, int32(0))
+
+       col, err := 
(&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 2)
+       if err != nil {
+               t.Fatalf("ReadColumn failed: %v", err)
+       }
+       if !col.IsNull(0) {
+               t.Error("row 0 should be null")
+       }
+       if col.IsNull(1) {
+               t.Error("row 1 should not be null")
+       }
+       val, err := col.GetBinary(1)
+       if err != nil {
+               t.Fatalf("GetBinary(1) failed: %v", err)
+       }
+       if len(val.values) != 0 {
+               t.Fatalf("expected empty string, got %q", string(val.values))
+       }
+}
+
+func TestBinaryArrayColumnDecoder_WithNull(t *testing.T) {
+       var buf bytes.Buffer
+       buf.Write(buildNullIndicatorBytes([]bool{false, true, false}))
+       writeText := func(s string) {
+               _ = binary.Write(&buf, binary.BigEndian, int32(len(s)))
+               buf.WriteString(s)
+       }
+       writeText("hello")
+       writeText("world")
+
+       col, err := 
(&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 3)
+       if err != nil {
+               t.Fatalf("ReadColumn failed: %v", err)
+       }
+       if col.IsNull(0) {
+               t.Error("row 0 should not be null")
+       }
+       if v, _ := col.GetBinary(0); string(v.values) != "hello" {
+               t.Errorf("row 0: expected \"hello\", got %q", string(v.values))
+       }
+       if !col.IsNull(1) {
+               t.Error("row 1 should be null")
+       }
+       if col.IsNull(2) {
+               t.Error("row 2 should not be null")
+       }
+       if v, _ := col.GetBinary(2); string(v.values) != "world" {
+               t.Errorf("row 2: expected \"world\", got %q", string(v.values))
+       }
+}
+
+func TestInt64ArrayColumnDecoder_WithNull(t *testing.T) {
+       var buf bytes.Buffer
+       buf.Write(buildNullIndicatorBytes([]bool{false, true, false}))
+       _ = binary.Write(&buf, binary.BigEndian, int64(100))
+       _ = binary.Write(&buf, binary.BigEndian, int64(200))
+
+       col, err := 
(&Int64ArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), INT64, 3)
+       if err != nil {
+               t.Fatalf("ReadColumn failed: %v", err)
+       }
+       if col.IsNull(0) {
+               t.Error("row 0 should not be null")
+       }
+       if v, _ := col.GetLong(0); v != 100 {
+               t.Errorf("row 0: expected 100, got %d", v)
+       }
+       if !col.IsNull(1) {
+               t.Error("row 1 should be null")
+       }
+       if col.IsNull(2) {
+               t.Error("row 2 should not be null")
+       }
+       if v, _ := col.GetLong(2); v != 200 {
+               t.Errorf("row 2: expected 200, got %d", v)
+       }
+}
+
+func TestColumnDecoder_ZeroPositionCount(t *testing.T) {
+       empty := func() *bytes.Reader { return bytes.NewReader([]byte{}) }
+
+       t.Run("Int32ArrayColumnDecoder", func(t *testing.T) {
+               col, err := (&Int32ArrayColumnDecoder{}).ReadColumn(empty(), 
INT32, 0)
+               if err != nil {
+                       t.Fatalf("ReadColumn failed: %v", err)
+               }
+               if col.GetPositionCount() != 0 {
+                       t.Errorf("expected positionCount=0, got %d", 
col.GetPositionCount())
+               }
+       })
+
+       t.Run("Int64ArrayColumnDecoder", func(t *testing.T) {
+               col, err := (&Int64ArrayColumnDecoder{}).ReadColumn(empty(), 
INT64, 0)
+               if err != nil {
+                       t.Fatalf("ReadColumn failed: %v", err)
+               }
+               if col.GetPositionCount() != 0 {
+                       t.Errorf("expected positionCount=0, got %d", 
col.GetPositionCount())
+               }
+       })
+
+       t.Run("ByteArrayColumnDecoder", func(t *testing.T) {
+               col, err := (&ByteArrayColumnDecoder{}).ReadColumn(empty(), 
BOOLEAN, 0)
+               if err != nil {
+                       t.Fatalf("ReadColumn failed: %v", err)
+               }
+               if col.GetPositionCount() != 0 {
+                       t.Errorf("expected positionCount=0, got %d", 
col.GetPositionCount())
+               }
+       })
+
+       t.Run("BinaryArrayColumnDecoder", func(t *testing.T) {
+               col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(empty(), 
TEXT, 0)
+               if err != nil {
+                       t.Fatalf("ReadColumn failed: %v", err)
+               }
+               if col.GetPositionCount() != 0 {
+                       t.Errorf("expected positionCount=0, got %d", 
col.GetPositionCount())
+               }
+       })
+}
diff --git a/client/session.go b/client/session.go
index 2cd1e82..fbdacd4 100644
--- a/client/session.go
+++ b/client/session.go
@@ -569,10 +569,15 @@ func (s *Session) ExecuteQueryStatement(sql string, 
timeoutMs *int64) (*SessionD
                        request.SessionId = s.sessionId
                        request.StatementId = s.requestStatementId
                        resp, err = 
s.client.ExecuteQueryStatementV2(context.Background(), &request)
-                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet(sql, resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
-                       } else {
-                               return nil, statusErr
+                       if err == nil {
+                               if resp == nil {
+                                       return nil, fmt.Errorf("received nil 
response after reconnect")
+                               }
+                               if statusErr := VerifySuccess(resp.Status); 
statusErr == nil {
+                                       return NewSessionDataSet(sql, 
resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
+                               } else {
+                                       return nil, statusErr
+                               }
                        }
                }
                return nil, err
@@ -597,10 +602,15 @@ func (s *Session) ExecuteAggregationQuery(paths []string, 
aggregations []common.
                if s.reconnect() {
                        request.SessionId = s.sessionId
                        resp, err = 
s.client.ExecuteAggregationQueryV2(context.Background(), &request)
-                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
-                       } else {
-                               return nil, statusErr
+                       if err == nil {
+                               if resp == nil {
+                                       return nil, fmt.Errorf("received nil 
response after reconnect")
+                               }
+                               if statusErr := VerifySuccess(resp.Status); 
statusErr == nil {
+                                       return NewSessionDataSet("", 
resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
+                               } else {
+                                       return nil, statusErr
+                               }
                        }
                }
                return nil, err
@@ -626,10 +636,15 @@ func (s *Session) 
ExecuteAggregationQueryWithLegalNodes(paths []string, aggregat
                if s.reconnect() {
                        request.SessionId = s.sessionId
                        resp, err = 
s.client.ExecuteAggregationQueryV2(context.Background(), &request)
-                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
-                       } else {
-                               return nil, statusErr
+                       if err == nil {
+                               if resp == nil {
+                                       return nil, fmt.Errorf("received nil 
response after reconnect")
+                               }
+                               if statusErr := VerifySuccess(resp.Status); 
statusErr == nil {
+                                       return NewSessionDataSet("", 
resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
+                               } else {
+                                       return nil, statusErr
+                               }
                        }
                }
                return nil, err
@@ -653,10 +668,15 @@ func (s *Session) 
ExecuteFastLastDataQueryForOnePrefixPath(prefixes []string, ti
                if s.reconnect() {
                        request.SessionId = s.sessionId
                        resp, err = 
s.client.ExecuteFastLastDataQueryForOnePrefixPath(context.Background(), 
&request)
-                       if statusErr := VerifySuccess(resp.Status); statusErr 
== nil {
-                               return NewSessionDataSet("", resp.Columns, 
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
-                       } else {
-                               return nil, statusErr
+                       if err == nil {
+                               if resp == nil {
+                                       return nil, fmt.Errorf("received nil 
response after reconnect")
+                               }
+                               if statusErr := VerifySuccess(resp.Status); 
statusErr == nil {
+                                       return NewSessionDataSet("", 
resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, 
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, 
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, 
*resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, 
resp.GetColumnIndex2TsBlockColumnIndexList())
+                               } else {
+                                       return nil, statusErr
+                               }
                        }
                }
                return nil, err
diff --git a/client/sessiondataset.go b/client/sessiondataset.go
index ef3faba..66ffab1 100644
--- a/client/sessiondataset.go
+++ b/client/sessiondataset.go
@@ -125,3 +125,7 @@ func (s *SessionDataSet) GetColumnNames() []string {
 func (s *SessionDataSet) GetColumnTypes() []string {
        return s.ioTDBRpcDataSet.columnTypeList
 }
+
+func (s *SessionDataSet) GetCurrentRowTime() time.Time {
+       return convertToTimestamp(s.ioTDBRpcDataSet.GetCurrentRowTime(), 
s.ioTDBRpcDataSet.timeFactor)
+}

Reply via email to