This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new fa61d20 [CARBONDATA-3561] Fix incorrect results after execute
delete/update operation if there are null values
fa61d20 is described below
commit fa61d209c60938e6f0c0e711bf6b98fa7061e2e5
Author: Zhang Zhichao <[email protected]>
AuthorDate: Wed Nov 13 11:18:07 2019 +0800
[CARBONDATA-3561] Fix incorrect results after execute delete/update
operation if there are null values
Problem:
If there are null values in table, after execute delete/update sql, the
results are incorrect when query with vector mode
Root cause:
In method 'fillVector' of 'LocalDictDimensionDataChunkStore', if the type
of 'vector' is 'ColumnarVectorWrapperDirectWithDeleteDelta', the parameter
'counter' of 'ColumnarVectorWrapperDirectWithDeleteDelta' does not increase
when the data is not null, so it will set the null value to the wrong index.
Solution:
Increase 'counter' of 'ColumnarVectorWrapperDirectWithDeleteDelta' when the
data is not null.
---
.../impl/LocalDictDimensionDataChunkStore.java | 3 ++
...ColumnarVectorWrapperDirectWithDeleteDelta.java | 7 +++++
...lumnarVectorWrapperDirectWithInvertedIndex.java | 5 ++++
.../examples/MinMaxIndexDataMapFactory.java | 8 ++---
.../testsuite/iud/DeleteCarbonTableTestCase.scala | 34 ++++++++++++++++++++++
5 files changed, 53 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index fa67d6a..c57cc8d 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -81,6 +81,9 @@ public class LocalDictDimensionDataChunkStore implements
DimensionDataChunkStore
vector.putNull(i);
dictionaryVector.putNull(i);
} else {
+ // if vector is 'ColumnarVectorWrapperDirectWithDeleteDelta', it needs
to call 'putNotNull'
+ // to increase 'counter', otherwise it will set the null value to the
wrong index.
+ vector.putNotNull(i);
dictionaryVector.putInt(i, surrogate);
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
index 63cb42c..e3a488c 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
@@ -159,6 +159,13 @@ class ColumnarVectorWrapperDirectWithDeleteDelta extends
AbstractCarbonColumnarV
}
@Override
+ public void putNotNull(int rowId) {
+ if (!deletedRows.get(rowId)) {
+ counter++;
+ }
+ }
+
+ @Override
public void putFloats(int rowId, int count, float[] src, int srcIndex) {
for (int i = srcIndex; i < count; i++) {
if (!deletedRows.get(rowId++)) {
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
index 14a1c43..b607c9f 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
@@ -101,6 +101,11 @@ public class ColumnarVectorWrapperDirectWithInvertedIndex
extends AbstractCarbon
}
@Override
+ public void putNotNull(int rowId) {
+ // nothing to do
+ }
+
+ @Override
public void putFloats(int rowId, int count, float[] src, int srcIndex) {
for (int i = srcIndex; i < count; i++) {
columnVector.putFloat(invertedIndex[rowId++], src[i]);
diff --git
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 69a8c16..6cecc90 100644
---
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -21,7 +21,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
@@ -42,9 +45,6 @@ import
org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
/**
* Min Max DataMap Factory
*/
@@ -137,7 +137,7 @@ public class MinMaxIndexDataMapFactory extends
CoarseGrainDataMapFactory {
* @param segment
*/
@Override
- public void clear(Segment segment) {
+ public void clear(String segmentNo) {
}
/**
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 4565d7a..3036ef1 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -24,10 +24,12 @@ import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -380,6 +382,38 @@ class DeleteCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists test_return_row_count").show()
}
+ test("[CARBONDATA-3561] Fix incorrect results after execute delete/update
operation if there are null values") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
+ val tableName = "fix_incorrect_results_for_iud"
+ sql(s"drop table if exists ${tableName}")
+
+ sql(s"create table ${tableName} (a string, b string, c string) stored by
'carbondata'").show()
+ sql(s"""insert into table ${tableName}
+ select '1','1','2017' union all
+ select '2','2','2017' union all
+ select '3','3','2017' union all
+ select '4','4','2017' union all
+ select '5',null,'2017' union all
+ select '6',null,'2017' union all
+ select '7','7','2017' union all
+ select '8','8','2017' union all
+ select '9',null,'2017' union all
+ select '10',null,'2017'""").show()
+
+ checkAnswer(sql(s"select count(1) from ${tableName} where b is null"),
Seq(Row(4)))
+
+ checkAnswer(sql(s"delete from ${tableName} where b ='4'"), Seq(Row(1)))
+ checkAnswer(sql(s"delete from ${tableName} where a ='9'"), Seq(Row(1)))
+ checkAnswer(sql(s"update ${tableName} set (b) = ('10') where a = '10'"),
Seq(Row(1)))
+
+ checkAnswer(sql(s"select count(1) from ${tableName} where b is null"),
Seq(Row(2)))
+ checkAnswer(sql(s"select * from ${tableName} where a = '1'"), Seq(Row("1",
"1", "2017")))
+ checkAnswer(sql(s"select * from ${tableName} where a = '10'"),
Seq(Row("10", "10", "2017")))
+
+ sql(s"drop table if exists ${tableName}").show()
+ }
+
override def afterAll {
sql("use default")
sql("drop database if exists iud_db cascade")