This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b159b95 [CARBONDATA-3542] Support Map data type reading through Hive
b159b95 is described below
commit b159b9541d6380de8b8f5adc37454ca0f775c485
Author: dhatchayani <[email protected]>
AuthorDate: Sat Oct 5 22:28:15 2019 +0530
[CARBONDATA-3542] Support Map data type reading through Hive
Problem:
Map data type is not supported while reading through hive
Solution:
Handle to support MAP data type.
This closes #3407
---
docs/hive-guide.md | 1 -
.../apache/carbondata/examples/HiveExample.scala | 38 +++++
.../hive/CarbonDictionaryDecodeReadSupport.java | 46 ++++++
.../apache/carbondata/hive/CarbonMapInspector.java | 179 +++++++++++++++++++++
.../carbondata/hive/CarbonObjectInspector.java | 6 +
5 files changed, 269 insertions(+), 1 deletion(-)
diff --git a/docs/hive-guide.md b/docs/hive-guide.md
index e839b9b..7aba3bf 100644
--- a/docs/hive-guide.md
+++ b/docs/hive-guide.md
@@ -116,6 +116,5 @@ select * from hive_carbon_1 order by id;
### Note
- Partition table support is not handled
- - Map data type is not supported
diff --git
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
index 6639e8e..241ee8c 100644
---
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
+++
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
@@ -94,6 +94,15 @@ object HiveExample {
"'QUOTECHAR'='\"',
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='c1_int,c2_Bigint," +
"c3_Decimal,c4_double,c5_string,c6_Timestamp,c7_Datatype_Desc')")
+ carbonSession.sql("""DROP TABLE IF EXISTS complexMap""".stripMargin)
+
+ carbonSession.sql("create table complexMap(name map<string,string>) stored
by 'carbondata'")
+
+ carbonSession
+ .sql(
+ "insert into complexMap
values(map('Manish','Nalla','Shardul','Singh','Vishal','Kumar'," +
+ "'EmptyVal','','NullVal', 'null'))")
+
carbonSession.close()
// delete the already existing lock on metastore so that new derby instance
@@ -266,6 +275,35 @@ object HiveExample {
s"$resultAggQueryFetched")
assert(resultAggQueryFetched == 1)
+ val resultComplexQuery = statement
+ .executeQuery(
+ "SELECT name FROM complexMap")
+
+ var resultComplex = 0
+
+ var name = ""
+
+ while (resultComplexQuery.next) {
+ if (resultComplex == 0) {
+
println("+------------------------------------------------------------------------------"
+
+ "------+")
+ println("| name
" +
+ " |")
+
+
println("+-------------------------------------------------------------------------------"
+
+ "-----+")
+
+ name = resultComplexQuery.getString("name")
+
+ println(s"|$name|")
+
println("+-------------------------------------------------------------------------------"
+
+ "-----+") }
+ resultComplex = resultComplex + 1
+ }
+ println(" ********** Total Rows Fetched When Complex Query **********" +
+ s"$resultComplex")
+ assert(resultComplex == 1)
+
hiveEmbeddedServer2.stop()
}
}
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
index 52ece32..1bb7088 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -143,6 +143,8 @@ public class CarbonDictionaryDecodeReadSupport<T>
implements CarbonReadSupport<T
return createStruct(obj, carbonColumn);
} else if (DataTypes.isArrayType(dataType)) {
return createArray(obj, carbonColumn);
+ } else if (DataTypes.isMapType(dataType)) {
+ return createMap(obj, carbonColumn);
} else {
return createWritablePrimitive(obj, carbonColumn);
}
@@ -207,6 +209,48 @@ public class CarbonDictionaryDecodeReadSupport<T>
implements CarbonReadSupport<T
}
/**
+ * Create the Map data for Map Datatype
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private ArrayWritable createMap(Object obj, CarbonColumn carbonColumn)
throws IOException {
+ Object[] objArray = (Object[]) obj;
+ List<CarbonDimension> childCarbonDimensions = null;
+ CarbonDimension mapDimension = null;
+ List<ArrayWritable> writablesList = new ArrayList<>();
+ if (carbonColumn.isDimension() &&
carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+ childCarbonDimensions = ((CarbonDimension)
carbonColumn).getListOfChildDimensions();
+ // get the map dimension wrapped inside the carbon dimension
+ mapDimension = childCarbonDimensions.get(0);
+ // get the child dimenesions of the map dimensions, child dimensions are
- Key and Value
+ if (null != mapDimension) {
+ childCarbonDimensions = mapDimension.getListOfChildDimensions();
+ }
+ }
+ if (null != childCarbonDimensions && childCarbonDimensions.size() == 2) {
+ Object[] keyObjects = (Object[]) objArray[0];
+ Object[] valObjects = (Object[]) objArray[1];
+ for (int i = 0; i < keyObjects.length; i++) {
+ Writable keyWritable = createWritableObject(keyObjects[i],
childCarbonDimensions.get(0));
+ Writable valWritable = createWritableObject(valObjects[i],
childCarbonDimensions.get(1));
+ Writable[] arr = new Writable[2];
+ arr[0] = keyWritable;
+ arr[1] = valWritable;
+ writablesList.add(new ArrayWritable(Writable.class, arr));
+ }
+ if (writablesList.size() > 0) {
+ final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class,
+ writablesList.toArray(new ArrayWritable[writablesList.size()]));
+ return new ArrayWritable(Writable.class, new Writable[] {subArray});
+ }
+ }
+ return null;
+ }
+
+ /**
* This method will create the Writable Objects for primitives.
*
* @param obj
@@ -256,6 +300,8 @@ public class CarbonDictionaryDecodeReadSupport<T>
implements CarbonReadSupport<T
return createArray(obj, carbonColumn);
} else if (DataTypes.isStructType(dataType)) {
return createStruct(obj, carbonColumn);
+ } else if (DataTypes.isMapType(dataType)) {
+ return createMap(obj, carbonColumn);
} else if (DataTypes.isDecimal(dataType)) {
return new HiveDecimalWritable(HiveDecimal.create(new
java.math.BigDecimal(obj.toString())));
} else {
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonMapInspector.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonMapInspector.java
new file mode 100644
index 0000000..b17ac43
--- /dev/null
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonMapInspector.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hive;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+
+public class CarbonMapInspector implements SettableMapObjectInspector {
+
+ protected final ObjectInspector keyInspector;
+ protected final ObjectInspector valueInspector;
+
+ public CarbonMapInspector(final ObjectInspector keyInspector,
+ final ObjectInspector valueInspector) {
+ this.keyInspector = keyInspector;
+ this.valueInspector = valueInspector;
+ }
+
+ @Override public String getTypeName() {
+ return "map<" + keyInspector.getTypeName() + "," +
valueInspector.getTypeName() + ">";
+ }
+
+ @Override public Category getCategory() {
+ return Category.MAP;
+ }
+
+ @Override
+ public ObjectInspector getMapKeyObjectInspector() {
+ return keyInspector;
+ }
+
+ @Override
+ public ObjectInspector getMapValueObjectInspector() {
+ return valueInspector;
+ }
+
+ @Override public Object getMapValueElement(Object data, Object key) {
+ if (data != null && key != null) {
+ Map<?, ?> map = (Map)data;
+ return map.get(key);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Map<?, ?> getMap(final Object data) {
+ if (data == null) {
+ return null;
+ }
+ if (data instanceof ArrayWritable) {
+ final Writable[] mapArray = ((ArrayWritable) data).get();
+ if (mapArray == null) {
+ return null;
+ }
+
+ final Map<Writable, Writable> map = new LinkedHashMap<>();
+ for (final Writable obj : mapArray) {
+ final ArrayWritable mapObj = (ArrayWritable) obj;
+ final Writable[] arr = mapObj.get();
+ for (int i = 0; i < arr.length; i++) {
+ map.put(((ArrayWritable) arr[i]).get()[0], ((ArrayWritable)
arr[i]).get()[1]);
+ }
+ }
+ return map;
+ }
+ if (data instanceof Map) {
+ return (Map) data;
+ }
+ throw new UnsupportedOperationException("Cannot inspect " +
data.getClass().getCanonicalName());
+ }
+
+ @Override
+ public int getMapSize(final Object data) {
+ if (data == null) {
+ return -1;
+ }
+ if (data instanceof ArrayWritable) {
+ final Writable[] mapArray = ((ArrayWritable) data).get();
+
+ if (mapArray == null) {
+ return -1;
+ } else {
+ return mapArray.length;
+ }
+ }
+ if (data instanceof Map) {
+ return ((Map) data).size();
+ }
+ throw new UnsupportedOperationException("Cannot inspect " +
data.getClass().getCanonicalName());
+ }
+
+ @Override
+ public Object create() {
+ Map<Object, Object> m = new LinkedHashMap<>();
+ return m;
+ }
+
+ @Override
+ public Object put(Object map, Object key, Object value) {
+ Map<Object, Object> m = (Map<Object, Object>) map;
+ m.put(key, value);
+ return m;
+ }
+
+ @Override
+ public Object remove(Object map, Object key) {
+ Map<Object, Object> m = (Map<Object, Object>) map;
+ m.remove(key);
+ return m;
+ }
+
+ @Override
+ public Object clear(Object map) {
+ Map<Object, Object> m = (Map<Object, Object>) map;
+ m.clear();
+ return m;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((keyInspector == null) ? 0 : keyInspector.hashCode());
+ result = prime * result
+ + ((valueInspector == null) ? 0 : valueInspector.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final CarbonMapInspector other = (CarbonMapInspector) obj;
+ if (keyInspector == null) {
+ if (other.keyInspector != null) {
+ return false;
+ }
+ } else if (!keyInspector.equals(other.keyInspector)) {
+ return false;
+ }
+ if (valueInspector == null) {
+ if (other.valueInspector != null) {
+ return false;
+ }
+ } else if (!valueInspector.equals(other.valueInspector)) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java
index 75c7056..9718405 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java
@@ -28,6 +28,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableHiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -88,6 +89,11 @@ class CarbonObjectInspector extends
SettableStructObjectInspector {
return new WritableHiveVarcharObjectInspector((VarcharTypeInfo)
typeInfo);
} else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) {
return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ } else if (typeInfo instanceof MapTypeInfo) {
+ MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+ ObjectInspector mapKeyObjectIns =
getObjectInspector(mapTypeInfo.getMapKeyTypeInfo());
+ ObjectInspector mapValObjectIns =
getObjectInspector(mapTypeInfo.getMapValueTypeInfo());
+ return new CarbonMapInspector(mapKeyObjectIns, mapValObjectIns);
} else {
throw new UnsupportedOperationException("Unknown field type: " +
typeInfo);
}