This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 76658e4  [GOBBLIN-1330] Add support for decimal type in the 
GobblinOrcWriter
76658e4 is described below

commit 76658e4bf20ec15ecec6f8947f648a64ad058a92
Author: Hung Tran <[email protected]>
AuthorDate: Thu Dec 3 14:32:15 2020 -0800

    [GOBBLIN-1330] Add support for decimal type in the GobblinOrcWriter
    
    Closes #3165 from htran1/orc_decimal
---
 .../gobblin/writer/AvroOrcSchemaConverter.java     | 25 +++++++++++++--
 .../writer/GenericRecordToOrcValueWriter.java      | 29 +++++++++++++++--
 .../gobblin/writer/AvroOrcSchemaConverterTest.java | 21 +++++++++++--
 .../writer/GenericRecordToOrcValueWriterTest.java  | 36 ++++++++++++++++++++++
 .../src/test/resources/decimal_test/data.json      |  6 ++++
 .../src/test/resources/decimal_test/schema.avsc    | 16 ++++++++++
 6 files changed, 126 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
index 3f16af7..2e1b113 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
@@ -40,7 +40,8 @@ public class AvroOrcSchemaConverter {
       case INT:
         return TypeDescription.createInt();
       case BYTES:
-        return TypeDescription.createBinary();
+      case FIXED:
+        return getTypeDescriptionForBinarySchema(avroSchema);
       case ARRAY:
         return 
TypeDescription.createList(getOrcSchema(avroSchema.getElementType()));
       case RECORD:
@@ -85,14 +86,32 @@ public class AvroOrcSchemaConverter {
       case ENUM:
         // represent as String for now
         return TypeDescription.createString();
-      case FIXED:
-        return TypeDescription.createBinary();
       default:
         throw new IllegalStateException(String.format("Unrecognized Avro type: 
%s", type.getName()));
     }
   }
 
   /**
+   * Get the {@link TypeDescription} for a binary schema type.
+   *
+   * This is based on logic from 
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo#generateTypeInfo.
+   *
+   * @return If the logical type is decimal then return a decimal 
TypeDescription, otherwise return a binary
+   * TypeDescription.
+   *
+   */
+  private static TypeDescription getTypeDescriptionForBinarySchema(Schema 
avroSchema) {
+    if ("decimal".equalsIgnoreCase(avroSchema.getProp("logicalType"))) {
+      int scale = avroSchema.getJsonProp("scale").asInt(0);
+      int precision = avroSchema.getJsonProp("precision").asInt();
+
+      return 
TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
+    }
+
+    return TypeDescription.createBinary();
+  }
+
+  /**
    * A helper method to check if the union is a nullable union. This check is 
to distinguish the case between a nullable and
    * a non-nullable union, each with a single member. In the former case, we 
want to "flatten" to the member type, while
    * in the case of the latter (i.e. non-nullable type), we want to preserve 
the union type.
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index 21a05a5..e1d3789 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.writer;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -187,9 +188,33 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
   }
 
   static class DecimalConverter implements Converter {
+    private final int scale;
+
+    public DecimalConverter(int scale) {
+      this.scale = scale;
+    }
 
     public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
-      ((DecimalColumnVector) 
output).vector[rowId].set(HiveDecimal.create((BigDecimal) data));
+      ((DecimalColumnVector) 
output).vector[rowId].set(getHiveDecimalFromByteBuffer((ByteBuffer) data));
+    }
+
+    /**
+     * Based on logic from org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils
+     */
+    private byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) {
+      byteBuffer.rewind();
+      byte[] result = new byte[byteBuffer.limit()];
+      byteBuffer.get(result);
+      return result;
+    }
+
+    /**
+     * Based on logic from org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils
+     */
+    private HiveDecimal getHiveDecimalFromByteBuffer(ByteBuffer byteBuffer) {
+      byte[] result = getBytesFromByteBuffer(byteBuffer);
+
+      return HiveDecimal.create(new BigInteger(result), this.scale);
     }
   }
 
@@ -382,7 +407,7 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       case VARCHAR:
         return new StringConverter();
       case DECIMAL:
-        return new DecimalConverter();
+        return new DecimalConverter(schema.getScale());
       case STRUCT:
         return new StructConverter(schema, 
AvroOrcSchemaConverter.sanitizeNullableSchema(avroSchema));
       case LIST:
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
index e03c0eb..a531047 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.writer;
 
 import java.util.List;
 
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.orc.TypeDescription;
@@ -73,6 +74,10 @@ public class AvroOrcSchemaConverterTest {
 
   @Test
   public void testTrivialAvroSchemaTranslation() throws Exception {
+    Schema decimalSchema = SchemaBuilder.builder().bytesType();
+    decimalSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "decimal");
+    decimalSchema.addProp("scale", 2);
+    decimalSchema.addProp("precision", 10);
 
     // Trivial cases
     Schema avroSchema = SchemaBuilder.record("test")
@@ -83,14 +88,20 @@ public class AvroOrcSchemaConverterTest {
         .name("int_type")
         .type(SchemaBuilder.builder().intType())
         .noDefault()
+        .name("decimal_type")
+        .type(decimalSchema)
+        .noDefault()
         .endRecord();
 
     TypeDescription orcSchema = TypeDescription.createStruct()
         .addField("string_type", TypeDescription.createString())
-        .addField("int_type", TypeDescription.createInt());
+        .addField("int_type", TypeDescription.createInt())
+        .addField("decimal_type", 
TypeDescription.createDecimal().withPrecision(10).withScale(2));
 
     // Top-level record name will not be replicated in conversion result.
     Assert.assertEquals(avroSchema.getFields(), 
getAvroSchema(orcSchema).getFields());
+
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(avroSchema), 
orcSchema);
   }
 
   @Test
@@ -148,8 +159,14 @@ public class AvroOrcSchemaConverterTest {
       case TIMESTAMP:
       case VARCHAR:
       case CHAR:
-      case DECIMAL:
         throw new UnsupportedOperationException("Types like BYTE and SHORT 
(and many more) are not supported in Avro");
+      case DECIMAL:
+        Schema bytesType = SchemaBuilder.builder().bytesType();
+        bytesType.addProp(LogicalType.LOGICAL_TYPE_PROP, "decimal");
+        bytesType.addProp("scale", schema.getScale());
+        bytesType.addProp("precision", schema.getPrecision());
+
+        return bytesType;
       case BOOLEAN:
         return SchemaBuilder.builder().booleanType();
       case INT:
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
index 7389991..5d1422e 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -114,6 +114,42 @@ public class GenericRecordToOrcValueWriterTest {
   }
 
   @Test
+  public void testDecimalRecordConversionWriter()
+      throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("decimal_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+
+    List<GenericRecord> recordList = GobblinOrcWriterTest
+        .deserializeAvroRecords(this.getClass(), schema, 
"decimal_test/data.json");
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+
+    // Flush RowBatch into disk.
+    File tempFile = new File(Files.createTempDir(), "orc");
+    tempFile.deleteOnExit();
+    Path filePath = new Path(tempFile.getAbsolutePath());
+
+    OrcFile.WriterOptions options = OrcFile.writerOptions(new Properties(), 
new Configuration());
+    options.setSchema(orcSchema);
+    Writer orcFileWriter = OrcFile.createWriter(filePath, options);
+    orcFileWriter.addRowBatch(rowBatch);
+    orcFileWriter.close();
+
+    // Load it back and compare.
+    FileSystem fs = FileSystem.get(new Configuration());
+    List<Writable> orcRecords = deserializeOrcRecords(filePath, fs);
+
+    Assert.assertEquals(orcRecords.size(), 2);
+    Assert.assertEquals(orcRecords.get(0).toString(), "{3.4}");
+    Assert.assertEquals(orcRecords.get(1).toString(), "{5.97}");
+  }
+
+  @Test
   public void testListResize()
       throws Exception {
     Schema schema =
diff --git 
a/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/data.json 
b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/data.json
new file mode 100644
index 0000000..904f5c0
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/data.json
@@ -0,0 +1,6 @@
+{
+  "id": "\u0001\u0054"
+}
+{
+  "id": "\u0002\u0055"
+}
diff --git 
a/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/schema.avsc 
b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/schema.avsc
new file mode 100644
index 0000000..a971da6
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/schema.avsc
@@ -0,0 +1,16 @@
+{
+  "namespace": "com.linkedin.decimal",
+  "type": "record",
+  "name": "DecimalTest",
+  "fields": [
+    {
+      "name": "id",
+      "type": {
+        "type": "bytes",
+        "logicalType": "decimal",
+        "precision": 10,
+        "scale": 2
+      }
+    }
+  ]
+}

Reply via email to