This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b323b78 [IOTDB-921] Solve the statement twice call bug (#1784)
b323b78 is described below
commit b323b78a87c8fa4942e8bc7ebf9af18bdf87a89f
Author: Qi Yu <[email protected]>
AuthorDate: Mon Oct 19 20:31:31 2020 +0800
[IOTDB-921] Solve the statement twice call bug (#1784)
---
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 64 +++++++++++++++++++++-
.../iotdb/db/integration/IoTDBSimpleQueryIT.java | 49 +++++++++++++++++
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 8 +++
3 files changed, 119 insertions(+), 2 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 60f9021..f4b7502 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -28,12 +28,19 @@ import
org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
+
+import java.nio.ByteBuffer;
import java.sql.*;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
public class IoTDBStatement implements Statement {
@@ -221,6 +228,8 @@ public class IoTDBStatement implements Statement {
} catch (StatementExecutionException e) {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
+
+ deepCopyResp(execResp);
if (execResp.isSetColumns()) {
queryId = execResp.getQueryId();
if (execResp.queryDataSet == null) {
@@ -319,12 +328,16 @@ public class IoTDBStatement implements Statement {
} catch (StatementExecutionException e) {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
+
+ //Because diffent resultSet share the same TTransport and buffer, if the
former has not comsumed
+ //result timely, the latter will overlap the former byte buffer, thus
problem will occur
+ deepCopyResp(execResp);
+
if (execResp.queryDataSet == null) {
this.resultSet = new IoTDBNonAlignJDBCResultSet(this,
execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap,
execResp.ignoreTimeStamp, client, sql, queryId,
sessionId, execResp.nonAlignQueryDataSet);
- }
- else {
+ } else {
this.resultSet = new IoTDBJDBCResultSet(this, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap,
execResp.ignoreTimeStamp, client, sql, queryId,
sessionId, execResp.queryDataSet);
@@ -332,6 +345,53 @@ public class IoTDBStatement implements Statement {
return resultSet;
}
+ private void deepCopyResp(TSExecuteStatementResp queryRes) {
+ final TSQueryDataSet tsQueryDataSet = queryRes.getQueryDataSet();
+ final TSQueryNonAlignDataSet nonAlignDataSet =
queryRes.getNonAlignQueryDataSet();
+
+ if (Objects.nonNull(tsQueryDataSet)) {
+ deepCopyTsQueryDataSet(tsQueryDataSet);
+ } else {
+ deepCopyNonAlignQueryDataSet(nonAlignDataSet);
+ }
+ }
+
+ private void deepCopyNonAlignQueryDataSet(TSQueryNonAlignDataSet
nonAlignDataSet) {
+ if (Objects.isNull(nonAlignDataSet)) {
+ return;
+ }
+
+ final List<ByteBuffer> valueList = nonAlignDataSet.valueList
+ .stream()
+ .map(ReadWriteIOUtils::clone)
+ .collect(Collectors.toList());
+
+ final List<ByteBuffer> timeList = nonAlignDataSet.timeList
+ .stream()
+ .map(ReadWriteIOUtils::clone)
+ .collect(Collectors.toList());
+
+ nonAlignDataSet.setTimeList(timeList);
+ nonAlignDataSet.setValueList(valueList);
+ }
+
+ private void deepCopyTsQueryDataSet(TSQueryDataSet tsQueryDataSet) {
+ final ByteBuffer time = ReadWriteIOUtils.clone(tsQueryDataSet.time);
+ final List<ByteBuffer> valueList = tsQueryDataSet.valueList
+ .stream()
+ .map(ReadWriteIOUtils::clone)
+ .collect(Collectors.toList());
+
+ final List<ByteBuffer> bitmapList = tsQueryDataSet.bitmapList
+ .stream()
+ .map(ReadWriteIOUtils::clone)
+ .collect(Collectors.toList());
+
+ tsQueryDataSet.setBitmapList(bitmapList);
+ tsQueryDataSet.setValueList(valueList);
+ tsQueryDataSet.setTime(time);
+ }
+
@Override
public int executeUpdate(String sql) throws SQLException {
checkConnection("execute update");
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 340ed25..b75197d 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -43,6 +43,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
public class IoTDBSimpleQueryIT {
@Before
@@ -607,4 +613,47 @@ public class IoTDBSimpleQueryIT {
fail();
}
}
+
+ @Test
+ public void testUseSameStatement() throws SQLException {
+ try (Connection connection =
DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.sg1");
+ statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT64,
ENCODING=RLE, COMPRESSOR=SNAPPY");
+ statement.execute("CREATE TIMESERIES root.sg1.d0.s1 WITH DATATYPE=INT64,
ENCODING=RLE, COMPRESSOR=SNAPPY");
+ statement.execute("CREATE TIMESERIES root.sg1.d1.s0 WITH DATATYPE=INT64,
ENCODING=RLE, COMPRESSOR=SNAPPY");
+ statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64,
ENCODING=RLE, COMPRESSOR=SNAPPY");
+
+ statement.execute("insert into root.sg1.d0(timestamp,s0,s1)
values(1,1,1)");
+ statement.execute("insert into root.sg1.d1(timestamp,s0,s1)
values(1000,1000,1000)");
+ statement.execute("insert into root.sg1.d0(timestamp,s0,s1)
values(10,10,10)");
+
+
+ List<ResultSet> resultSetList = new ArrayList<>();
+
+ ResultSet r1 = statement.executeQuery("select * from root.sg1.d0 where
time <= 1");
+ resultSetList.add(r1);
+
+ ResultSet r2 = statement.executeQuery("select * from root.sg1.d1 where
s0 == 1000");
+ resultSetList.add(r2);
+
+ ResultSet r3 = statement.executeQuery("select * from root.sg1.d0 where
s1 == 10");
+ resultSetList.add(r3);
+
+ r1.next();
+ Assert.assertEquals(r1.getLong(1), 1L);
+ Assert.assertEquals(r1.getLong(2), 1L);
+ Assert.assertEquals(r1.getLong(3), 1L);
+
+ r2.next();
+ Assert.assertEquals(r2.getLong(1), 1000L);
+ Assert.assertEquals(r2.getLong(2), 1000L);
+ Assert.assertEquals(r2.getLong(3), 1000L);
+
+ r3.next();
+ Assert.assertEquals(r3.getLong(1), 10L);
+ Assert.assertEquals(r3.getLong(2), 10L);
+ Assert.assertEquals(r3.getLong(3), 10L);
+ }
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 2a25fef..c4f3a69 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -937,5 +937,13 @@ public class ReadWriteIOUtils {
}
}
+ public static ByteBuffer clone(ByteBuffer original) {
+ ByteBuffer clone = ByteBuffer.allocate(original.remaining());
+ while(original.hasRemaining()) {
+ clone.put(original.get());
+ }
+ clone.flip();
+ return clone;
+ }
}