This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a9462c2 HIVE-25553: Support Map data-type natively in Arrow format
(Sruthi M, reviewed by Sankar Hariappan)
a9462c2 is described below
commit a9462c2f0c8f23bcb0de2564a7f75772feb30972
Author: Sruthi Mooriyathvariam <[email protected]>
AuthorDate: Mon Oct 25 19:44:51 2021 +0530
HIVE-25553: Support Map data-type natively in Arrow format (Sruthi M,
reviewed by Sankar Hariappan)
This covers the following sub-tasks:
HIVE-25554: Upgrade arrow version to 0.15
HIVE-25555: ArrowColumnarBatchSerDe should store map natively instead of
converting to list
a. Upgrading arrow version to version 0.15.0 (where map data-type is
supported)
b. Modifying ArrowColumnarBatchSerDe and corresponding
Serializer/Deserializer to not use list as a workaround for map and use the
arrow map data-type instead
c. Taking care of creating non-nullable struct and non-nullable key type
for the map data-type in ArrowColumnarBatchSerDe
Signed-off-by: Sankar Hariappan <[email protected]>
Closes (#2689)
---
data/files/datatypes.txt | 4 +-
.../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 16 +++--
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 83 ++++++++++++----------
.../hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java | 83 ++++++++++++----------
pom.xml | 2 +-
.../hive/llap/WritableByteChannelAdapter.java | 2 +-
.../hive/ql/io/arrow/ArrowColumnarBatchSerDe.java | 17 ++---
.../apache/hadoop/hive/ql/io/arrow/Serializer.java | 42 +++++++----
8 files changed, 142 insertions(+), 107 deletions(-)
diff --git a/data/files/datatypes.txt b/data/files/datatypes.txt
index 0872a1f..38f8d29 100644
--- a/data/files/datatypes.txt
+++ b/data/files/datatypes.txt
@@ -1,3 +1,3 @@
\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N
--1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N\N
-1true1.11121x2ykva92.2111.01abcd1111213142212212x1abcd22012-04-22
09:00:00.123456789123456789.123456YWJjZA==2013-01-01abc123abc123X'01FF'
+-1false-1.11\Nab\N\N\N-1-1-1.0-1110100\N\N\N\N\N\N\N\N\N
+1true1.11121x2ykvbca92.2111.01abcd1111213142212212x1abcd22012-04-22
09:00:00.123456789123456789.123456YWJjZA==2013-01-01abc123abc123X'01FF'
\ No newline at end of file
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 20682ff..2ec3d48 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -499,10 +499,12 @@ public abstract class BaseJdbcWithMiniLlap {
assertEquals(0, c5Value.size());
Map<?,?> c6Value = (Map<?,?>) rowValues[5];
- assertEquals(0, c6Value.size());
+ assertEquals(1, c6Value.size());
+ assertEquals(null, c6Value.get(1));
Map<?,?> c7Value = (Map<?,?>) rowValues[6];
- assertEquals(0, c7Value.size());
+ assertEquals(1, c7Value.size());
+ assertEquals("b", c7Value.get("a"));
List<?> c8Value = (List<?>) rowValues[7];
assertEquals(null, c8Value.get(0));
@@ -518,7 +520,10 @@ public abstract class BaseJdbcWithMiniLlap {
assertEquals(0, c13Value.size());
Map<?,?> c14Value = (Map<?,?>) rowValues[13];
- assertEquals(0, c14Value.size());
+ assertEquals(1, c14Value.size());
+ Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ assertEquals(1, mapVal.size());
+ assertEquals(100, mapVal.get(Integer.valueOf(10)));
List<?> c15Value = (List<?>) rowValues[14];
assertEquals(null, c15Value.get(0));
@@ -553,8 +558,9 @@ public abstract class BaseJdbcWithMiniLlap {
assertEquals("y", c6Value.get(Integer.valueOf(2)));
c7Value = (Map<?,?>) rowValues[6];
- assertEquals(1, c7Value.size());
+ assertEquals(2, c7Value.size());
assertEquals("v", c7Value.get("k"));
+ assertEquals("c", c7Value.get("b"));
c8Value = (List<?>) rowValues[7];
assertEquals("a", c8Value.get(0));
@@ -577,7 +583,7 @@ public abstract class BaseJdbcWithMiniLlap {
c14Value = (Map<?,?>) rowValues[13];
assertEquals(2, c14Value.size());
- Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
assertEquals(2, mapVal.size());
assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index 9d0ff2d..209c42b 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.type.Timestamp;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
import org.apache.hadoop.hive.llap.Row;
@@ -123,8 +124,6 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
return new LlapArrowRowInputFormat(Long.MAX_VALUE);
}
- // Currently MAP type is not supported. Add it back when Arrow 1.0 is
released.
- // See: SPARK-21187
@Test
@Override
public void testDataTypes() throws Exception {
@@ -184,11 +183,13 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
List<?> c5Value = (List<?>) rowValues[4];
assertEquals(0, c5Value.size());
- //Map<?,?> c6Value = (Map<?,?>) rowValues[5];
- //assertEquals(0, c6Value.size());
+ Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(1, c6Value.size());
+ assertEquals(null, c6Value.get(1));
- //Map<?,?> c7Value = (Map<?,?>) rowValues[6];
- //assertEquals(0, c7Value.size());
+ Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(1, c7Value.size());
+ assertEquals("b", c7Value.get("a"));
List<?> c8Value = (List<?>) rowValues[7];
assertEquals(null, c8Value.get(0));
@@ -203,15 +204,18 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
List<?> c13Value = (List<?>) rowValues[12];
assertEquals(0, c13Value.size());
- //Map<?,?> c14Value = (Map<?,?>) rowValues[13];
- //assertEquals(0, c14Value.size());
+ Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(1, c14Value.size());
+ Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ assertEquals(1, mapVal.size());
+ assertEquals(100, mapVal.get(Integer.valueOf(10)));
List<?> c15Value = (List<?>) rowValues[14];
assertEquals(null, c15Value.get(0));
assertEquals(null, c15Value.get(1));
- //List<?> c16Value = (List<?>) rowValues[15];
- //assertEquals(0, c16Value.size());
+ List<?> c16Value = (List<?>) rowValues[15];
+ assertEquals(0, c16Value.size());
assertEquals(null, rowValues[16]);
assertEquals(null, rowValues[17]);
@@ -233,14 +237,15 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
assertEquals(Integer.valueOf(1), c5Value.get(0));
assertEquals(Integer.valueOf(2), c5Value.get(1));
- //c6Value = (Map<?,?>) rowValues[5];
- //assertEquals(2, c6Value.size());
- //assertEquals("x", c6Value.get(Integer.valueOf(1)));
- //assertEquals("y", c6Value.get(Integer.valueOf(2)));
+ c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(2, c6Value.size());
+ assertEquals("x", c6Value.get(Integer.valueOf(1)));
+ assertEquals("y", c6Value.get(Integer.valueOf(2)));
- //c7Value = (Map<?,?>) rowValues[6];
- //assertEquals(1, c7Value.size());
- //assertEquals("v", c7Value.get("k"));
+ c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(2, c7Value.size());
+ assertEquals("v", c7Value.get("k"));
+ assertEquals("c", c7Value.get("b"));
c8Value = (List<?>) rowValues[7];
assertEquals("a", c8Value.get(0));
@@ -261,15 +266,15 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
assertEquals("c", listVal.get(0));
assertEquals("d", listVal.get(1));
- //c14Value = (Map<?,?>) rowValues[13];
- //assertEquals(2, c14Value.size());
- //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
- //assertEquals(2, mapVal.size());
- //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
- //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
- //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
- //assertEquals(1, mapVal.size());
- //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+ c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(2, c14Value.size());
+ mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ assertEquals(2, mapVal.size());
+ assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+ assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+ mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+ assertEquals(1, mapVal.size());
+ assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
c15Value = (List<?>) rowValues[14];
assertEquals(Integer.valueOf(1), c15Value.get(0));
@@ -278,19 +283,19 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
assertEquals(Integer.valueOf(2), listVal.get(0));
assertEquals("x", listVal.get(1));
- //c16Value = (List<?>) rowValues[15];
- //assertEquals(2, c16Value.size());
- //listVal = (List<?>) c16Value.get(0);
- //assertEquals(2, listVal.size());
- //mapVal = (Map<?,?>) listVal.get(0);
- //assertEquals(0, mapVal.size());
- //assertEquals(Integer.valueOf(1), listVal.get(1));
- //listVal = (List<?>) c16Value.get(1);
- //mapVal = (Map<?,?>) listVal.get(0);
- //assertEquals(2, mapVal.size());
- //assertEquals("b", mapVal.get("a"));
- //assertEquals("d", mapVal.get("c"));
- //assertEquals(Integer.valueOf(2), listVal.get(1));
+ c16Value = (List<?>) rowValues[15];
+ assertEquals(2, c16Value.size());
+ listVal = (List<?>) c16Value.get(0);
+ assertEquals(2, listVal.size());
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(0, mapVal.size());
+ assertEquals(Integer.valueOf(1), listVal.get(1));
+ listVal = (List<?>) c16Value.get(1);
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(2, mapVal.size());
+ assertEquals("b", mapVal.get("a"));
+ assertEquals("d", mapVal.get("c"));
+ assertEquals(Integer.valueOf(2), listVal.get(1));
assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456"),
rowValues[16]);
assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
index 9025703..f7cb406 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.common.type.Timestamp;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.io.NullWritable;
@@ -64,8 +65,6 @@ public class TestJdbcWithMiniLlapVectorArrow extends
BaseJdbcWithMiniLlap {
return new LlapArrowRowInputFormat(Long.MAX_VALUE);
}
- // Currently MAP type is not supported. Add it back when Arrow 1.0 is
released.
- // See: SPARK-21187
@Override
public void testDataTypes() throws Exception {
createDataTypesTable("datatypes");
@@ -124,11 +123,13 @@ public class TestJdbcWithMiniLlapVectorArrow extends
BaseJdbcWithMiniLlap {
List<?> c5Value = (List<?>) rowValues[4];
assertEquals(0, c5Value.size());
- //Map<?,?> c6Value = (Map<?,?>) rowValues[5];
- //assertEquals(0, c6Value.size());
+ Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(1, c6Value.size());
+ assertEquals(null, c6Value.get(1));
- //Map<?,?> c7Value = (Map<?,?>) rowValues[6];
- //assertEquals(0, c7Value.size());
+ Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(1, c7Value.size());
+ assertEquals("b", c7Value.get("a"));
List<?> c8Value = (List<?>) rowValues[7];
assertEquals(null, c8Value.get(0));
@@ -143,15 +144,18 @@ public class TestJdbcWithMiniLlapVectorArrow extends
BaseJdbcWithMiniLlap {
List<?> c13Value = (List<?>) rowValues[12];
assertEquals(0, c13Value.size());
- //Map<?,?> c14Value = (Map<?,?>) rowValues[13];
- //assertEquals(0, c14Value.size());
+ Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(1, c14Value.size());
+ Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ assertEquals(1, mapVal.size());
+ assertEquals(100, mapVal.get(Integer.valueOf(10)));
List<?> c15Value = (List<?>) rowValues[14];
assertEquals(null, c15Value.get(0));
assertEquals(null, c15Value.get(1));
- //List<?> c16Value = (List<?>) rowValues[15];
- //assertEquals(0, c16Value.size());
+ List<?> c16Value = (List<?>) rowValues[15];
+ assertEquals(0, c16Value.size());
assertEquals(null, rowValues[16]);
assertEquals(null, rowValues[17]);
@@ -173,14 +177,15 @@ public class TestJdbcWithMiniLlapVectorArrow extends
BaseJdbcWithMiniLlap {
assertEquals(Integer.valueOf(1), c5Value.get(0));
assertEquals(Integer.valueOf(2), c5Value.get(1));
- //c6Value = (Map<?,?>) rowValues[5];
- //assertEquals(2, c6Value.size());
- //assertEquals("x", c6Value.get(Integer.valueOf(1)));
- //assertEquals("y", c6Value.get(Integer.valueOf(2)));
+ c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(2, c6Value.size());
+ assertEquals("x", c6Value.get(Integer.valueOf(1)));
+ assertEquals("y", c6Value.get(Integer.valueOf(2)));
- //c7Value = (Map<?,?>) rowValues[6];
- //assertEquals(1, c7Value.size());
- //assertEquals("v", c7Value.get("k"));
+ c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(2, c7Value.size());
+ assertEquals("v", c7Value.get("k"));
+ assertEquals("c", c7Value.get("b"));
c8Value = (List<?>) rowValues[7];
assertEquals("a", c8Value.get(0));
@@ -201,15 +206,15 @@ public class TestJdbcWithMiniLlapVectorArrow extends
BaseJdbcWithMiniLlap {
assertEquals("c", listVal.get(0));
assertEquals("d", listVal.get(1));
- //c14Value = (Map<?,?>) rowValues[13];
- //assertEquals(2, c14Value.size());
- //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
- //assertEquals(2, mapVal.size());
- //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
- //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
- //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
- //assertEquals(1, mapVal.size());
- //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+ c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(2, c14Value.size());
+ mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ assertEquals(2, mapVal.size());
+ assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+ assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+ mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+ assertEquals(1, mapVal.size());
+ assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
c15Value = (List<?>) rowValues[14];
assertEquals(Integer.valueOf(1), c15Value.get(0));
@@ -218,19 +223,19 @@ public class TestJdbcWithMiniLlapVectorArrow extends
BaseJdbcWithMiniLlap {
assertEquals(Integer.valueOf(2), listVal.get(0));
assertEquals("x", listVal.get(1));
- //c16Value = (List<?>) rowValues[15];
- //assertEquals(2, c16Value.size());
- //listVal = (List<?>) c16Value.get(0);
- //assertEquals(2, listVal.size());
- //mapVal = (Map<?,?>) listVal.get(0);
- //assertEquals(0, mapVal.size());
- //assertEquals(Integer.valueOf(1), listVal.get(1));
- //listVal = (List<?>) c16Value.get(1);
- //mapVal = (Map<?,?>) listVal.get(0);
- //assertEquals(2, mapVal.size());
- //assertEquals("b", mapVal.get("a"));
- //assertEquals("d", mapVal.get("c"));
- //assertEquals(Integer.valueOf(2), listVal.get(1));
+ c16Value = (List<?>) rowValues[15];
+ assertEquals(2, c16Value.size());
+ listVal = (List<?>) c16Value.get(0);
+ assertEquals(2, listVal.size());
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(0, mapVal.size());
+ assertEquals(Integer.valueOf(1), listVal.get(1));
+ listVal = (List<?>) c16Value.get(1);
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(2, mapVal.size());
+ assertEquals("b", mapVal.get("a"));
+ assertEquals("d", mapVal.get("c"));
+ assertEquals(Integer.valueOf(2), listVal.get(1));
assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456"),
rowValues[16]);
assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
diff --git a/pom.xml b/pom.xml
index 47c5d53..23d24c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,7 +111,7 @@
<antlr.version>3.5.2</antlr.version>
<apache-directory-server.version>1.5.7</apache-directory-server.version>
<!-- Include arrow for LlapOutputFormatService -->
- <arrow.version>0.11.0</arrow.version>
+ <arrow.version>0.15.1</arrow.version>
<avatica.version>1.12.0</avatica.version>
<avro.version>1.8.2</avro.version>
<bcprov-jdk15on.version>1.64</bcprov-jdk15on.version>
diff --git
a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
index b07ce5b..0bb06f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -93,7 +93,7 @@ public class WritableByteChannelAdapter implements
WritableByteChannel {
int size = src.remaining();
//Down the semaphore or block until available
takeWriteResources(1);
- ByteBuf buf = allocator.buffer(size);
+ ByteBuf buf = allocator.getAsByteBufAllocator().buffer(size);
buf.writeBytes(src);
chc.writeAndFlush(buf).addListener(writeListener);
return size;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
index 2ecf4d9..fdef3b8 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
@@ -184,14 +184,14 @@ public class ArrowColumnarBatchSerDe extends
AbstractSerDe {
final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
final TypeInfo keyTypeInfo = mapTypeInfo.getMapKeyTypeInfo();
final TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
- final StructTypeInfo mapStructTypeInfo = new StructTypeInfo();
- mapStructTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys",
"values"));
- mapStructTypeInfo.setAllStructFieldTypeInfos(
- Lists.newArrayList(keyTypeInfo, valueTypeInfo));
- final ListTypeInfo mapListStructTypeInfo = new ListTypeInfo();
- mapListStructTypeInfo.setListElementTypeInfo(mapStructTypeInfo);
- return toField(name, mapListStructTypeInfo);
+ final List<Field> mapFields = Lists.newArrayList();
+ mapFields.add(toField(name+"_keys", keyTypeInfo));
+ mapFields.add(toField(name+"_values", valueTypeInfo));
+
+ FieldType struct = new FieldType(false, new ArrowType.Struct(), null);
+ List<Field> childrenOfList = Lists.newArrayList(new Field(name,
struct, mapFields));
+ return new Field(name, FieldType.nullable(MinorType.LIST.getType()),
childrenOfList);
default:
throw new IllegalArgumentException();
}
@@ -199,7 +199,7 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
static ListTypeInfo toStructListTypeInfo(MapTypeInfo mapTypeInfo) {
final StructTypeInfo structTypeInfo = new StructTypeInfo();
- structTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys",
"values"));
+ structTypeInfo.setAllStructFieldNames(Lists.newArrayList("key", "value"));
structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(
mapTypeInfo.getMapKeyTypeInfo(), mapTypeInfo.getMapValueTypeInfo()));
final ListTypeInfo structListTypeInfo = new ListTypeInfo();
@@ -217,6 +217,7 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
structListVector.childCount = mapVector.childCount;
structListVector.isRepeating = mapVector.isRepeating;
structListVector.noNulls = mapVector.noNulls;
+ System.arraycopy(mapVector.isNull, 0, structListVector.isNull, 0,
mapVector.childCount);
System.arraycopy(mapVector.offsets, 0, structListVector.offsets, 0,
mapVector.childCount);
System.arraycopy(mapVector.lengths, 0, structListVector.lengths, 0,
mapVector.childCount);
return structListVector;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 96812d3..c00885e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -276,7 +276,7 @@ public class Serializer {
case STRUCT:
return ArrowType.Struct.INSTANCE;
case MAP:
- return ArrowType.List.INSTANCE;
+ return new ArrowType.Map(false);
case UNION:
default:
throw new IllegalArgumentException();
@@ -290,10 +290,14 @@ public class Serializer {
writePrimitive(arrowVector, hiveVector, typeInfo, size,
vectorizedRowBatch, isNative);
break;
case LIST:
- writeList((ListVector) arrowVector, (ListColumnVector) hiveVector,
(ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
+ // the flag 'isMapDataType'=false, for all the list types except for
the case when map is converted
+ // as a list of structs.
+ writeList((ListVector) arrowVector, (ListColumnVector) hiveVector,
(ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative, false);
break;
case STRUCT:
- writeStruct((NonNullableStructVector) arrowVector,
(StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size,
vectorizedRowBatch, isNative);
+ // the flag 'isMapDataType'=false, for all the struct types except for
the case when map is converted
+ // as a list of structs.
+ writeStruct((NonNullableStructVector) arrowVector,
(StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size,
vectorizedRowBatch, isNative, false);
break;
case UNION:
writeUnion(arrowVector, hiveVector, typeInfo, size,
vectorizedRowBatch, isNative);
@@ -311,7 +315,8 @@ public class Serializer {
final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo);
final ListColumnVector structListVector = hiveVector == null ? null :
toStructListVector(hiveVector);
- write(arrowVector, structListVector, structListTypeInfo, size,
vectorizedRowBatch, isNative);
+ // Map is converted as a list of structs and thus we call the writeList()
method with the flag 'isMapDataType'=true
+ writeList(arrowVector, structListVector, structListTypeInfo, size,
vectorizedRowBatch, isNative, true);
for (int rowIndex = 0; rowIndex < size; rowIndex++) {
int selectedIndex = rowIndex;
@@ -341,12 +346,11 @@ public class Serializer {
}
private void writeStruct(NonNullableStructVector arrowVector,
StructColumnVector hiveVector,
- StructTypeInfo typeInfo, int size, VectorizedRowBatch
vectorizedRowBatch, boolean isNative) {
+ StructTypeInfo typeInfo, int size, VectorizedRowBatch
vectorizedRowBatch, boolean isNative, boolean isMapDataType) {
final List<String> fieldNames = typeInfo.getAllStructFieldNames();
final List<TypeInfo> fieldTypeInfos =
typeInfo.getAllStructFieldTypeInfos();
final ColumnVector[] hiveFieldVectors = hiveVector == null ? null :
hiveVector.fields;
final int fieldSize = fieldTypeInfos.size();
-
// This is to handle following scenario -
// if any struct value itself is NULL, we get structVector.isNull[i]=true
// but we don't get the same for it's child fields which later causes
exceptions while setting to arrow vectors
@@ -366,9 +370,12 @@ public class Serializer {
final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
final ColumnVector hiveFieldVector = hiveVector == null ? null :
hiveFieldVectors[fieldIndex];
final String fieldName = fieldNames.get(fieldIndex);
- final FieldVector arrowFieldVector =
- arrowVector.addOrGet(fieldName,
- toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class);
+
+ // If the call is coming from writeMap(), then the structs within the
list type should be non-nullable.
+ FieldType elementFieldType = (isMapDataType) ? (new FieldType(false,
toArrowType(fieldTypeInfo), null))
+ : (toFieldType(fieldTypeInfos.get(fieldIndex)));
+ final FieldVector arrowFieldVector = arrowVector.addOrGet(fieldName,
elementFieldType, FieldVector.class);
+
arrowFieldVector.setInitialCapacity(size);
arrowFieldVector.allocateNew();
write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size,
vectorizedRowBatch, isNative);
@@ -421,12 +428,17 @@ public class Serializer {
}
private void writeList(ListVector arrowVector, ListColumnVector hiveVector,
ListTypeInfo typeInfo, int size,
- VectorizedRowBatch vectorizedRowBatch, boolean
isNative) {
+ VectorizedRowBatch vectorizedRowBatch, boolean
isNative, boolean isMapDataType) {
final int OFFSET_WIDTH = 4;
final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
final ColumnVector hiveElementVector = hiveVector == null ? null :
hiveVector.child;
+
+ // If the call is coming from writeMap(), then the List type should be
non-nullable.
+ FieldType elementFieldType = (isMapDataType) ? (new FieldType(false,
toArrowType(elementTypeInfo), null))
+ : (toFieldType(elementTypeInfo));
+
final FieldVector arrowElementVector =
- (FieldVector)
arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
+ (FieldVector)
arrowVector.addOrGetVector(elementFieldType).getVector();
VectorizedRowBatch correctedVrb = vectorizedRowBatch;
int correctedSize = hiveVector == null ? 0 : hiveVector.childCount;
@@ -437,7 +449,13 @@ public class Serializer {
arrowElementVector.setInitialCapacity(correctedSize);
arrowElementVector.allocateNew();
- write(arrowElementVector, hiveElementVector, elementTypeInfo,
correctedSize, correctedVrb, isNative);
+ // If the flag 'isMapDataType' is set to True, it means that the call is
coming from writeMap() and it has to call
+ // writeStruct() with the same flag value, as the map is converted as a
list of structs.
+ if (isMapDataType) {
+ writeStruct((NonNullableStructVector) arrowElementVector,
(StructColumnVector) hiveElementVector, (StructTypeInfo) elementTypeInfo,
correctedSize, correctedVrb, isNative, isMapDataType);
+ } else {
+ write(arrowElementVector, hiveElementVector, elementTypeInfo,
correctedSize, correctedVrb, isNative);
+ }
int nextOffset = 0;