This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4d48017 [CARBONDATA-3931] Fix secondary index on DateType column
giving wrong results
4d48017 is described below
commit 4d480178fe4258f87acb6d09101f6514fe4c5c42
Author: Indhumathi27 <[email protected]>
AuthorDate: Wed Jul 29 21:18:48 2020 +0530
[CARBONDATA-3931] Fix secondary index on DateType column giving wrong
results
Why is this PR needed?
On data load to SI with date data type, dictionary values is loaded from
factToIndexDictColumnMapping instead of getting from main table query results.
factToIndexDictColumnMapping will have value as 0, hence only first row data
will be loaded
What changes were proposed in this PR?
Get dictionary Keys from wrapper and convert it to Int and load to SI.
Removed factToIndexDictColumnMapping.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3874
---
.../carbondata/core/scan/wrappers/ByteArrayWrapper.java | 10 ++++++++++
.../main/java/org/apache/carbondata/core/util/ByteUtil.java | 8 --------
.../testsuite/secondaryindex/TestSIWithSecondryIndex.scala | 13 +++++++++++++
.../query/SecondaryIndexQueryResultProcessor.java | 5 ++---
.../sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala | 6 +-----
.../processing/merger/CompactionResultSortProcessor.java | 9 +--------
6 files changed, 27 insertions(+), 24 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 242c177..13a2723 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.wrappers;
import java.io.Serializable;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
/**
@@ -105,6 +106,15 @@ public class ByteArrayWrapper implements
Comparable<ByteArrayWrapper>, Serializa
}
/**
+ * to get the dictionary column data
+ * @param index
+ * @return
+ */
+ public int getDictionaryKeyByIndex(int index) {
+ return ByteUtil.convertBytesToInt(this.getDictionaryKey(), index * 4);
+ }
+
+ /**
* to generate the hash code
*/
@Override
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 3165dde..760825d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -781,14 +781,6 @@ public final class ByteUtil {
return 4;
}
- public static int[] convertBytesToIntArray(byte[] input) {
- int[] output = new int[input.length / 4];
- for (int i = 0; i < output.length; i++) {
- output[i] = convertBytesToInt(input, i * 4);
- }
- return output;
- }
-
public static long[] convertBytesToLongArray(byte[] input) {
long[] output = new long[input.length / 4];
for (int i = 0; i < output.length; i++) {
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 9e6ce9b..53333f0 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -246,6 +246,19 @@ class TestSIWithSecondryIndex extends QueryTest with
BeforeAndAfterAll {
assert(errorMessage.contains("Index [uniqdataidxtable] already exists
under database [default]"))
}
+ test("test date type with SI table") {
+ sql("drop table if exists maintable")
+ sql("CREATE TABLE maintable (id int,name string,salary float,dob
date,address string) STORED AS carbondata")
+ sql("insert into maintable
values(1,'aa',23423.334,'2009-09-06','df'),(1,'aa',23423.334,'2009-09-07','df')")
+ sql("insert into maintable select 2,'bb',4454.454,'2009-09-09','bang'")
+ sql("drop index if exists index_date on maintable")
+ sql("create index index_date on table maintable(dob) AS 'carbondata'")
+ val df = sql("select id,name,dob from maintable where dob = '2009-09-07'")
+ assert(isFilterPushedDownToSI(df.queryExecution.sparkPlan))
+ checkAnswer(df, Seq(Row(1,"aa", java.sql.Date.valueOf("2009-09-07"))))
+ sql("drop table if exists maintable")
+ }
+
override def afterAll {
sql("drop index si_altercolumn on table_WithSIAndAlter")
sql("drop table if exists table_WithSIAndAlter")
diff --git
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 51e36be..b200655 100644
---
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -164,14 +164,13 @@ public class SecondaryIndexQueryResultProcessor {
public SecondaryIndexQueryResultProcessor(CarbonLoadModel carbonLoadModel,
int[] columnCardinality, String segmentId, CarbonTable indexTable,
- int[] factToIndexColumnMapping, int[] factToIndexDictColumnMapping) {
+ int[] factToIndexColumnMapping) {
this.carbonLoadModel = carbonLoadModel;
this.columnCardinality = columnCardinality;
this.segmentId = segmentId;
this.indexTable = indexTable;
this.databaseName = carbonLoadModel.getDatabaseName();
this.factToIndexColumnMapping = factToIndexColumnMapping;
- this.factToIndexDictColumnMapping = factToIndexDictColumnMapping;
initSegmentProperties();
}
@@ -259,7 +258,7 @@ public class SecondaryIndexQueryResultProcessor {
CarbonDimension dims = dimensions.get(i);
if (dims.hasEncoding(Encoding.DICTIONARY)) {
// dictionary
- preparedRow[i] = factToIndexDictColumnMapping[dictionaryIndex++];
+ preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
} else {
// no dictionary dims
byte[] noDictionaryKeyByIndex =
wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
index 66e47aa..9196c9e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
@@ -86,9 +86,6 @@ class CarbonSecondaryIndexRDD[K, V](
val factToIndexColumnMapping: Array[Int] = SecondaryIndexUtil
.prepareColumnMappingOfFactToIndexTable(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
indexTable, isDictColsAlone = false)
- val factToIndexDictColumnMapping: Array[Int] = SecondaryIndexUtil
-
.prepareColumnMappingOfFactToIndexTable(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
- indexTable, isDictColsAlone = true)
override def internalCompute(theSplit: Partition, context: TaskContext):
Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -148,8 +145,7 @@ class CarbonSecondaryIndexRDD[K, V](
columnCardinality,
segmentId,
indexCarbonTable,
- factToIndexColumnMapping,
- factToIndexDictColumnMapping)
+ factToIndexColumnMapping)
context.addTaskCompletionListener { context =>
if (null != secondaryIndexQueryResultProcessor) {
secondaryIndexQueryResultProcessor.close()
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 368f5d0..8c43631 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -36,7 +36,6 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -311,12 +310,6 @@ public class CompactionResultSortProcessor extends
AbstractResultProcessor {
private Object[] prepareRowObjectForSorting(Object[] row) {
ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
Object[] preparedRow = new Object[dimensions.size() + measureCount];
- byte[] dictionaryKey = wrapper.getDictionaryKey();
- int[] keyArray = ByteUtil.convertBytesToIntArray(dictionaryKey);
- Object[] dictionaryValues = new Object[dimensionColumnCount +
measureCount];
- for (int i = 0; i < keyArray.length; i++) {
- dictionaryValues[i] = keyArray[i];
- }
int noDictionaryIndex = 0;
int dictionaryIndex = 0;
int complexIndex = 0;
@@ -325,7 +318,7 @@ public class CompactionResultSortProcessor extends
AbstractResultProcessor {
CarbonDimension dims = dimensions.get(i);
if (dims.getDataType() == DataTypes.DATE && !dims.isComplex()) {
// dictionary
- preparedRow[i] = dictionaryValues[dictionaryIndex++];
+ preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
} else if (!dims.isComplex()) {
// no dictionary dims
byte[] noDictionaryKeyByIndex =
wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);