Repository: incubator-drill
Updated Branches:
  refs/heads/master 8dedd7182 -> fd7aee133


DRILL-1534: Implement Mappify UDF to transform map into a repeated map with 
key, value pairs


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fd7aee13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fd7aee13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fd7aee13

Branch: refs/heads/master
Commit: fd7aee133a5c55a61c1b2b31a4e27e074c669278
Parents: 8dedd71
Author: Mehant Baid <meha...@gmail.com>
Authored: Sat Oct 11 22:35:22 2014 -0700
Committer: Mehant Baid <meha...@gmail.com>
Committed: Tue Oct 21 14:27:18 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/expr/fn/impl/Mappify.java |  67 ++++++
 .../drill/exec/expr/fn/impl/MappifyUtility.java |  87 +++++++
 .../drill/exec/vector/complex/MapUtility.java   | 228 +++++++++++++++++++
 .../complex/writer/TestComplexTypeReader.java   |   5 +
 .../src/test/resources/jsoninput/input3.json    |  42 ++++
 5 files changed, 429 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fd7aee13/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java
new file mode 100644
index 0000000..7f340e1
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import javax.inject.Inject;
+
+public class Mappify {
+
+  /*
+   * The following function can be invoked when we want to convert a map to a 
repeated map where every
+   * with two fields in each entry:
+   * key: the name of the field in the original map and
+   * value: value of the field
+   *
+   * For eg, consider the following json file:
+   *
+   * {"foo": {"obj":1, "bar":10}}
+   * {"foo": {"obj":2, "bar":20}}
+   *
+   * Invoking mappify(foo) would result in the following output
+   *
+   * [{"key":"obj", "value":1}, {"key":"bar", "value":10}]
+   * [{"key":"obj", "value":2}, {"key":"bar", "value":20}]
+   *
+   * Currently this function only allows
+   * simple maps as input
+   * scalar value fields
+   * value fields need to be of the same data type
+   */
+  @FunctionTemplate(names = {"mappify", "kvgen"}, scope = 
FunctionTemplate.FunctionScope.SIMPLE, nulls = 
FunctionTemplate.NullHandling.NULL_IF_NULL, isRandom = true)
+  public static class ConvertMapToKeyValuePairs implements DrillSimpleFunc {
+    @Param  FieldReader reader;
+    @Inject DrillBuf buffer;
+    @Output ComplexWriter writer;
+
+    public void setup(RecordBatch incoming) {
+    }
+
+    public void eval() {
+      org.apache.drill.exec.expr.fn.impl.MappifyUtility.mappify(reader, 
writer, buffer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fd7aee13/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
new file mode 100644
index 0000000..92f62ca
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import com.google.common.base.Charsets;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.vector.complex.MapUtility;
+import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+import io.netty.buffer.DrillBuf;
+
+import java.util.Iterator;
+
+public class MappifyUtility {
+
+  // Default names used in the map.
+  public static final String fieldKey = "key";
+  public static final String fieldValue = "value";
+
+  public static void mappify(FieldReader reader, BaseWriter.ComplexWriter 
writer, DrillBuf buffer) {
+    // Currently we expect single map as input
+    if (!(reader instanceof SingleMapReaderImpl)) {
+      throw new DrillRuntimeException("Mappify function only supports Simple 
maps as input");
+    }
+    BaseWriter.ListWriter listWriter = writer.rootAsList();
+    listWriter.start();
+    BaseWriter.MapWriter mapWriter = listWriter.map();
+
+    // Iterate over the fields in the map
+    Iterator<String> fieldIterator = reader.iterator();
+    while (fieldIterator.hasNext()) {
+      String str = fieldIterator.next();
+      FieldReader fieldReader = reader.reader(str);
+
+      // Check if the value field is not repeated
+      if (fieldReader.getType().getMode() == TypeProtos.DataMode.REPEATED) {
+        throw new DrillRuntimeException("Mappify function does not support 
repeated type values");
+      }
+
+      // writing a new field, start a new map
+      mapWriter.start();
+
+      // write "key":"columnname" into the map
+      VarCharHolder vh = new VarCharHolder();
+      byte[] b = str.getBytes(Charsets.UTF_8);
+      buffer.reallocIfNeeded(b.length);
+      buffer.setBytes(0, b);
+      vh.start = 0;
+      vh.end = b.length;
+      vh.buffer = buffer;
+      mapWriter.varChar(fieldKey).write(vh);
+
+      // Skip the value field if its null
+      if (fieldReader.isSet() == false) {
+        mapWriter.end();
+        continue;
+      }
+
+      // Write the value to the map
+      MapUtility.writeToMapFromReader(fieldReader, mapWriter, buffer);
+
+      mapWriter.end();
+    }
+    listWriter.end();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fd7aee13/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
new file mode 100644
index 0000000..6c1907a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.fn.impl.MappifyUtility;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Decimal18Holder;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.IntervalDayHolder;
+import org.apache.drill.exec.expr.holders.IntervalHolder;
+import org.apache.drill.exec.expr.holders.IntervalYearHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.UInt1Holder;
+import org.apache.drill.exec.expr.holders.UInt2Holder;
+import org.apache.drill.exec.expr.holders.UInt4Holder;
+import org.apache.drill.exec.expr.holders.UInt8Holder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.joda.time.Period;
+
+public class MapUtility {
+  /*
+   * Function to read a value from the field reader, detect the type, 
construct the appropriate value holder
+   * and use the value holder to write to the Map.
+   */
+  public static void writeToMapFromReader(FieldReader fieldReader, 
BaseWriter.MapWriter mapWriter, DrillBuf buffer) {
+
+    MajorType valueMajorType = fieldReader.getType();
+    MinorType valueMinorType = valueMajorType.getMinorType();
+
+    switch (valueMinorType) {
+      case TINYINT:
+        TinyIntHolder tinyIntHolder = new TinyIntHolder();
+        tinyIntHolder.value = fieldReader.readByte();
+        mapWriter.tinyInt(MappifyUtility.fieldValue).write(tinyIntHolder);
+        break;
+      case SMALLINT:
+        SmallIntHolder smallIntHolder = new SmallIntHolder();
+        smallIntHolder.value = fieldReader.readShort();
+        mapWriter.smallInt(MappifyUtility.fieldValue).write(smallIntHolder);
+        break;
+      case BIGINT:
+        BigIntHolder bh = new BigIntHolder();
+        bh.value = fieldReader.readLong();
+        mapWriter.bigInt(MappifyUtility.fieldValue).write(bh);
+        break;
+      case INT:
+        IntHolder ih = new IntHolder();
+        ih.value = fieldReader.readInteger();
+        mapWriter.integer(MappifyUtility.fieldValue).write(ih);
+        break;
+      case UINT1:
+        UInt1Holder uInt1Holder = new UInt1Holder();
+        uInt1Holder.value = fieldReader.readByte();
+        mapWriter.uInt1(MappifyUtility.fieldValue).write(uInt1Holder);
+        break;
+      case UINT2:
+        UInt2Holder uInt2Holder = new UInt2Holder();
+        uInt2Holder.value = fieldReader.readCharacter();
+        mapWriter.uInt2(MappifyUtility.fieldValue).write(uInt2Holder);
+        break;
+      case UINT4:
+        UInt4Holder uInt4Holder = new UInt4Holder();
+        uInt4Holder.value = fieldReader.readInteger();
+        mapWriter.uInt4(MappifyUtility.fieldValue).write(uInt4Holder);
+        break;
+      case UINT8:
+        UInt8Holder uInt8Holder = new UInt8Holder();
+        uInt8Holder.value = fieldReader.readInteger();
+        mapWriter.uInt8(MappifyUtility.fieldValue).write(uInt8Holder);
+        break;
+      case DECIMAL9:
+        Decimal9Holder decimalHolder = new Decimal9Holder();
+        decimalHolder.value = fieldReader.readBigDecimal().intValue();
+        decimalHolder.scale = valueMajorType.getScale();
+        decimalHolder.precision = valueMajorType.getPrecision();
+        mapWriter.decimal9(MappifyUtility.fieldValue).write(decimalHolder);
+        break;
+      case DECIMAL18:
+        Decimal18Holder decimal18Holder = new Decimal18Holder();
+        decimal18Holder.value = fieldReader.readBigDecimal().longValue();
+        decimal18Holder.scale = valueMajorType.getScale();
+        decimal18Holder.precision = valueMajorType.getPrecision();
+        mapWriter.decimal18(MappifyUtility.fieldValue).write(decimal18Holder);
+        break;
+      case DECIMAL28SPARSE:
+        Decimal28SparseHolder decimal28Holder = new Decimal28SparseHolder();
+
+        // Ensure that the buffer used to store decimal is of sufficient length
+        buffer.reallocIfNeeded(decimal28Holder.WIDTH);
+        decimal28Holder.scale = valueMajorType.getScale();
+        decimal28Holder.precision = valueMajorType.getPrecision();
+        decimal28Holder.buffer = buffer;
+        decimal28Holder.start = 0;
+        DecimalUtility.getSparseFromBigDecimal(fieldReader.readBigDecimal(), 
buffer, 0, decimal28Holder.scale,
+            decimal28Holder.precision, decimal28Holder.nDecimalDigits);
+        
mapWriter.decimal28Sparse(MappifyUtility.fieldValue).write(decimal28Holder);
+        break;
+      case DECIMAL38SPARSE:
+        Decimal38SparseHolder decimal38Holder = new Decimal38SparseHolder();
+
+        // Ensure that the buffer used to store decimal is of sufficient length
+        buffer.reallocIfNeeded(decimal38Holder.WIDTH);
+        decimal38Holder.scale = valueMajorType.getScale();
+        decimal38Holder.precision = valueMajorType.getPrecision();
+        decimal38Holder.buffer = buffer;
+        decimal38Holder.start = 0;
+        DecimalUtility.getSparseFromBigDecimal(fieldReader.readBigDecimal(), 
buffer, 0, decimal38Holder.scale,
+            decimal38Holder.precision, decimal38Holder.nDecimalDigits);
+
+        
mapWriter.decimal38Sparse(MappifyUtility.fieldValue).write(decimal38Holder);
+        break;
+      case DATE:
+        DateHolder dateHolder = new DateHolder();
+        dateHolder.value = fieldReader.readLong();
+        mapWriter.date(MappifyUtility.fieldValue).write(dateHolder);
+        break;
+      case TIME:
+        TimeHolder timeHolder = new TimeHolder();
+        timeHolder.value = fieldReader.readInteger();
+        mapWriter.time(MappifyUtility.fieldValue).write(timeHolder);
+        break;
+      case TIMESTAMP:
+        TimeStampHolder timeStampHolder = new TimeStampHolder();
+        timeStampHolder.value = fieldReader.readLong();
+        mapWriter.timeStamp(MappifyUtility.fieldValue).write(timeStampHolder);
+        break;
+      case INTERVAL:
+        IntervalHolder intervalHolder = new IntervalHolder();
+        Period period = fieldReader.readPeriod();
+        intervalHolder.months = (period.getYears() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.yearsToMonths) + 
period.getMonths();
+        intervalHolder.days = period.getDays();
+        intervalHolder.milliseconds = (period.getHours() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) +
+            (period.getMinutes() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) +
+            (period.getSeconds() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) +
+            (period.getMillis());
+        mapWriter.interval(MappifyUtility.fieldValue).write(intervalHolder);
+        break;
+      case INTERVALDAY:
+        IntervalDayHolder intervalDayHolder = new IntervalDayHolder();
+        Period periodDay = fieldReader.readPeriod();
+        intervalDayHolder.days = periodDay.getDays();
+        intervalDayHolder.milliseconds = (periodDay.getHours() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) +
+            (periodDay.getMinutes() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) +
+            (periodDay.getSeconds() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) +
+            (periodDay.getMillis());
+        
mapWriter.intervalDay(MappifyUtility.fieldValue).write(intervalDayHolder);
+        break;
+      case INTERVALYEAR:
+        IntervalYearHolder intervalYearHolder = new IntervalYearHolder();
+        Period periodYear = fieldReader.readPeriod();
+        intervalYearHolder.value = (periodYear.getYears() * 
org.apache.drill.exec.expr.fn.impl.DateUtility.yearsToMonths) + 
periodYear.getMonths();
+        
mapWriter.intervalYear(MappifyUtility.fieldValue).write(intervalYearHolder);
+        break;
+      case FLOAT4:
+        Float4Holder float4Holder = new Float4Holder();
+        float4Holder.value = fieldReader.readFloat();
+        mapWriter.float4(MappifyUtility.fieldValue).write(float4Holder);
+        break;
+      case FLOAT8:
+        Float8Holder float8Holder = new Float8Holder();
+        float8Holder.value = fieldReader.readDouble();
+        mapWriter.float8(MappifyUtility.fieldValue).write(float8Holder);
+        break;
+      case BIT:
+        BitHolder bitHolder = new BitHolder();
+        bitHolder.value = (fieldReader.readBoolean() == true) ? 1 : 0;
+        mapWriter.bit(MappifyUtility.fieldValue).write(bitHolder);
+        break;
+      case VARCHAR:
+        VarCharHolder vh1 = new VarCharHolder();
+        byte[] b = fieldReader.readText().toString().getBytes(Charsets.UTF_8);
+        buffer.reallocIfNeeded(b.length);
+        buffer.setBytes(0, b);
+        vh1.start = 0;
+        vh1.end = b.length;
+        vh1.buffer = buffer;
+        mapWriter.varChar(MappifyUtility.fieldValue).write(vh1);
+        break;
+      case VARBINARY:
+        VarBinaryHolder varBinaryHolder = new VarBinaryHolder();
+        byte[] b1 = fieldReader.readByteArray();
+        buffer.reallocIfNeeded(b1.length);
+        buffer.setBytes(0, b1);
+        varBinaryHolder.start = 0;
+        varBinaryHolder.end = b1.length;
+        varBinaryHolder.buffer = buffer;
+        mapWriter.varBinary(MappifyUtility.fieldValue).write(varBinaryHolder);
+        break;
+      default:
+        throw new DrillRuntimeException(String.format("Mappify does not 
support input of type: %s", valueMinorType));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fd7aee13/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java
index 2e62897..df92dde 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java
@@ -183,4 +183,9 @@ public class TestComplexTypeReader extends BaseTestQuery{
     test("select l, l from cp.`jsoninput/input2.json`;");
   }
 
+  @Test
+  public void testKeyValueGen() throws Exception {
+    test("select kvgen(x) from cp.`jsoninput/input2.json`");
+    test("select kvgen(bigintegercol), kvgen(float8col) from 
cp.`jsoninput/input3.json`");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fd7aee13/exec/java-exec/src/test/resources/jsoninput/input3.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/input3.json 
b/exec/java-exec/src/test/resources/jsoninput/input3.json
new file mode 100644
index 0000000..8eed227
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/input3.json
@@ -0,0 +1,42 @@
+{
+  "bigintegercol" : {
+    "int_1" : 1,
+    "int_2": 2,
+    "int_3": 3
+  },
+  "float8col" : {
+    "f8_1" : 1.1,
+    "f8_2": 2.2
+  }
+}
+{
+  "bigintegercol" : {
+    "int_1" : 1,
+    "int_2": 2
+  },
+  "float8col" : {
+    "f8_1" : 1.1,
+    "f8_2": 2.2,
+    "f8_3": 3.3
+  }
+}
+{
+  "bigintegercol" : {
+    "int_1" : 1,
+    "int_3": 3
+  },
+  "float8col" : {
+    "f8_1" : 1.1,
+    "f8_3": 6.6
+  }
+}
+{
+  "bigintegercol" : {
+    "int_2": 2,
+    "int_3": 3
+  },
+  "float8col" : {
+    "f8_1" : 1.1,
+    "f8_2": 2.2
+  }
+}

Reply via email to