Repository: carbondata Updated Branches: refs/heads/branch-1.5 ef1068cad -> 3c7b33992 (forced update)
[CARBONDATA-2969]local dictioanry query fix for spark-2.3 This closes #2761 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2ab2254b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2ab2254b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2ab2254b Branch: refs/heads/branch-1.5 Commit: 2ab2254be84f82fd2f4b99a6b73353f4c7a55d10 Parents: f239894 Author: akashrn5 <[email protected]> Authored: Tue Sep 25 20:43:06 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Wed Sep 26 15:01:38 2018 +0800 ---------------------------------------------------------------------- .../LocalDictionarySupportLoadTableTest.scala | 14 +++++ .../vectorreader/CarbonDictionaryWrapper.java | 44 --------------- .../vectorreader/ColumnarVectorWrapper.java | 11 +--- .../spark/sql/CarbonDictionaryWrapper.java | 44 +++++++++++++++ .../org/apache/spark/sql/CarbonVectorProxy.java | 10 ++-- .../spark/sql/CarbonDictionaryWrapper.java | 56 ++++++++++++++++++++ .../org/apache/spark/sql/CarbonVectorProxy.java | 8 +-- 7 files changed, 127 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala index e88d8a9..d23c844 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala @@ -136,6 +136,20 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA assert(checkForLocalDictionary(getDimRawChunk(2))) } + test("test local dictionary data validation") { + sql("drop table if exists local_query_enable") + sql("drop table if exists local_query_disable") + sql( + "CREATE TABLE local_query_enable(name string) STORED BY 'carbondata' tblproperties" + + "('local_dictionary_enable'='false','local_dictionary_include'='name')") + sql("load data inpath '" + file1 + "' into table local_query_enable OPTIONS('header'='false')") + sql( + "CREATE TABLE local_query_disable(name string) STORED BY 'carbondata' tblproperties" + + "('local_dictionary_enable'='true','local_dictionary_include'='name')") + sql("load data inpath '" + file1 + "' into table local_query_disable OPTIONS('header'='false')") + checkAnswer(sql("select name from local_query_enable"), sql("select name from local_query_disable")) + } + test("test to validate local dictionary values"){ sql("drop table if exists local2") sql("CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties('local_dictionary_enable'='true')") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java deleted file mode 100644 index 7f1e577..0000000 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.vectorreader; - -import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; - -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.io.api.Binary; - -public class CarbonDictionaryWrapper extends Dictionary { - - private Binary[] binaries; - - CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) { - super(encoding); - binaries = new Binary[dictionary.getDictionarySize()]; - for (int i = 0; i < binaries.length; i++) { - binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i)); - } - } - - @Override public int getMaxId() { - return binaries.length - 1; - } - - @Override public Binary decodeToBinary(int id) { - return binaries[id]; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java index a0938da..6acf31f 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java +++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java @@ -269,16 +269,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector { } @Override public void setDictionary(CarbonDictionary dictionary) { - if (dictionary == null) { - sparkColumnVectorProxy.setDictionary(null, ordinal); - } else { - sparkColumnVectorProxy - .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary),ordinal); - } - } - - private void setDictionaryType(boolean type) { - this.isDictionary = type; + sparkColumnVectorProxy.setDictionary(dictionary, ordinal); } @Override public boolean hasDictionary() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java new file mode 100644 index 0000000..b7c6741 --- /dev/null +++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; + +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.io.api.Binary; + +public class CarbonDictionaryWrapper extends Dictionary { + + private Binary[] binaries; + + CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) { + super(encoding); + binaries = new Binary[dictionary.getDictionarySize()]; + for (int i = 0; i < binaries.length; i++) { + binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i)); + } + } + + @Override public int getMaxId() { + return binaries.length - 1; + } + + @Override public Binary decodeToBinary(int id) { + return binaries[id]; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java index f39bc93..80e6dbd 100644 --- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java +++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java @@ -18,7 +18,10 @@ package org.apache.spark.sql; import java.math.BigInteger; +import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; + import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnarBatch; @@ -82,9 +85,10 @@ public class CarbonVectorProxy { return columnarBatch.capacity(); } - public void setDictionary(Object dictionary, int ordinal) { - if (dictionary instanceof Dictionary) { - columnarBatch.column(ordinal).setDictionary((Dictionary) dictionary); + public void setDictionary(CarbonDictionary dictionary, int ordinal) { + if (null != dictionary) { + columnarBatch.column(ordinal) + .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary)); } else { columnarBatch.column(ordinal).setDictionary(null); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java new file mode 100644 index 0000000..5a99c68 --- /dev/null +++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; + +import org.apache.spark.sql.execution.vectorized.Dictionary; + +public class CarbonDictionaryWrapper implements Dictionary { + + /** + * dictionary values + */ + private byte[][] binaries; + + CarbonDictionaryWrapper(CarbonDictionary dictionary) { + binaries = new byte[dictionary.getDictionarySize()][]; + for (int i = 0; i < binaries.length; i++) { + binaries[i] = dictionary.getDictionaryValue(i); + } + } + + @Override public int decodeToInt(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support int"); + } + + @Override public long decodeToLong(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support long"); + } + + @Override public float decodeToFloat(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support float"); + } + + @Override public double decodeToDouble(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support double"); + } + + @Override public byte[] decodeToBinary(int id) { + return binaries[id]; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java index 0f23294..4a0fb9e 100644 --- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java +++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java @@ -18,6 +18,8 @@ package org.apache.spark.sql; import java.math.BigInteger; +import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; + import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.Dictionary; @@ -262,9 +264,9 @@ public class CarbonVectorProxy { return columnVectors[ordinal].hasDictionary(); } - public void setDictionary(Object dictionary, int ordinal) { - if (dictionary instanceof Dictionary) { - columnVectors[ordinal].setDictionary((Dictionary) dictionary); + public void setDictionary(CarbonDictionary dictionary, int ordinal) { + if (null != dictionary) { + columnVectors[ordinal].setDictionary(new CarbonDictionaryWrapper(dictionary)); } else { columnVectors[ordinal].setDictionary(null); }
