This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 76658e4 [GOBBLIN-1330] Add support for decimal type in the
GobblinOrcWriter
76658e4 is described below
commit 76658e4bf20ec15ecec6f8947f648a64ad058a92
Author: Hung Tran <[email protected]>
AuthorDate: Thu Dec 3 14:32:15 2020 -0800
[GOBBLIN-1330] Add support for decimal type in the GobblinOrcWriter
Closes #3165 from htran1/orc_decimal
---
.../gobblin/writer/AvroOrcSchemaConverter.java | 25 +++++++++++++--
.../writer/GenericRecordToOrcValueWriter.java | 29 +++++++++++++++--
.../gobblin/writer/AvroOrcSchemaConverterTest.java | 21 +++++++++++--
.../writer/GenericRecordToOrcValueWriterTest.java | 36 ++++++++++++++++++++++
.../src/test/resources/decimal_test/data.json | 6 ++++
.../src/test/resources/decimal_test/schema.avsc | 16 ++++++++++
6 files changed, 126 insertions(+), 7 deletions(-)
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
index 3f16af7..2e1b113 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
@@ -40,7 +40,8 @@ public class AvroOrcSchemaConverter {
case INT:
return TypeDescription.createInt();
case BYTES:
- return TypeDescription.createBinary();
+ case FIXED:
+ return getTypeDescriptionForBinarySchema(avroSchema);
case ARRAY:
return
TypeDescription.createList(getOrcSchema(avroSchema.getElementType()));
case RECORD:
@@ -85,14 +86,32 @@ public class AvroOrcSchemaConverter {
case ENUM:
// represent as String for now
return TypeDescription.createString();
- case FIXED:
- return TypeDescription.createBinary();
default:
throw new IllegalStateException(String.format("Unrecognized Avro type:
%s", type.getName()));
}
}
/**
+ * Get the {@link TypeDescription} for a binary schema type.
+ *
+ * This is based on logic from
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo#generateTypeInfo.
+ *
+ * @return If the logical type is decimal then return a decimal
TypeDescription, otherwise return a binary
+ * TypeDescription.
+ *
+ */
+ private static TypeDescription getTypeDescriptionForBinarySchema(Schema
avroSchema) {
+ if ("decimal".equalsIgnoreCase(avroSchema.getProp("logicalType"))) {
+ int scale = avroSchema.getJsonProp("scale").asInt(0);
+ int precision = avroSchema.getJsonProp("precision").asInt();
+
+ return
TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
+ }
+
+ return TypeDescription.createBinary();
+ }
+
+ /**
* A helper method to check if the union is a nullable union. This check is
to distinguish the case between a nullable and
* a non-nullable union, each with a single member. In the former case, we
want to "flatten" to the member type, while
* in the case of the latter (i.e. non-nullable type), we want to preserve
the union type.
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index 21a05a5..e1d3789 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -187,9 +188,33 @@ public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericReco
}
static class DecimalConverter implements Converter {
+ private final int scale;
+
+ public DecimalConverter(int scale) {
+ this.scale = scale;
+ }
public void addValue(int rowId, int column, Object data, ColumnVector
output) {
- ((DecimalColumnVector)
output).vector[rowId].set(HiveDecimal.create((BigDecimal) data));
+ ((DecimalColumnVector)
output).vector[rowId].set(getHiveDecimalFromByteBuffer((ByteBuffer) data));
+ }
+
+ /**
+ * Based on logic from org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils
+ */
+ private byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) {
+ byteBuffer.rewind();
+ byte[] result = new byte[byteBuffer.limit()];
+ byteBuffer.get(result);
+ return result;
+ }
+
+ /**
+ * Based on logic from org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils
+ */
+ private HiveDecimal getHiveDecimalFromByteBuffer(ByteBuffer byteBuffer) {
+ byte[] result = getBytesFromByteBuffer(byteBuffer);
+
+ return HiveDecimal.create(new BigInteger(result), this.scale);
}
}
@@ -382,7 +407,7 @@ public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericReco
case VARCHAR:
return new StringConverter();
case DECIMAL:
- return new DecimalConverter();
+ return new DecimalConverter(schema.getScale());
case STRUCT:
return new StructConverter(schema,
AvroOrcSchemaConverter.sanitizeNullableSchema(avroSchema));
case LIST:
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
index e03c0eb..a531047 100644
---
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.writer;
import java.util.List;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.orc.TypeDescription;
@@ -73,6 +74,10 @@ public class AvroOrcSchemaConverterTest {
@Test
public void testTrivialAvroSchemaTranslation() throws Exception {
+ Schema decimalSchema = SchemaBuilder.builder().bytesType();
+ decimalSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "decimal");
+ decimalSchema.addProp("scale", 2);
+ decimalSchema.addProp("precision", 10);
// Trivial cases
Schema avroSchema = SchemaBuilder.record("test")
@@ -83,14 +88,20 @@ public class AvroOrcSchemaConverterTest {
.name("int_type")
.type(SchemaBuilder.builder().intType())
.noDefault()
+ .name("decimal_type")
+ .type(decimalSchema)
+ .noDefault()
.endRecord();
TypeDescription orcSchema = TypeDescription.createStruct()
.addField("string_type", TypeDescription.createString())
- .addField("int_type", TypeDescription.createInt());
+ .addField("int_type", TypeDescription.createInt())
+ .addField("decimal_type",
TypeDescription.createDecimal().withPrecision(10).withScale(2));
// Top-level record name will not be replicated in conversion result.
Assert.assertEquals(avroSchema.getFields(),
getAvroSchema(orcSchema).getFields());
+
+ Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(avroSchema),
orcSchema);
}
@Test
@@ -148,8 +159,14 @@ public class AvroOrcSchemaConverterTest {
case TIMESTAMP:
case VARCHAR:
case CHAR:
- case DECIMAL:
throw new UnsupportedOperationException("Types like BYTE and SHORT
(and many more) are not supported in Avro");
+ case DECIMAL:
+ Schema bytesType = SchemaBuilder.builder().bytesType();
+ bytesType.addProp(LogicalType.LOGICAL_TYPE_PROP, "decimal");
+ bytesType.addProp("scale", schema.getScale());
+ bytesType.addProp("precision", schema.getPrecision());
+
+ return bytesType;
case BOOLEAN:
return SchemaBuilder.builder().booleanType();
case INT:
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
index 7389991..5d1422e 100644
---
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -114,6 +114,42 @@ public class GenericRecordToOrcValueWriterTest {
}
@Test
+ public void testDecimalRecordConversionWriter()
+ throws Exception {
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("decimal_test/schema.avsc"));
+
+ TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
+ VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+
+ List<GenericRecord> recordList = GobblinOrcWriterTest
+ .deserializeAvroRecords(this.getClass(), schema,
"decimal_test/data.json");
+ for (GenericRecord record : recordList) {
+ valueWriter.write(record, rowBatch);
+ }
+
+ // Flush RowBatch into disk.
+ File tempFile = new File(Files.createTempDir(), "orc");
+ tempFile.deleteOnExit();
+ Path filePath = new Path(tempFile.getAbsolutePath());
+
+ OrcFile.WriterOptions options = OrcFile.writerOptions(new Properties(),
new Configuration());
+ options.setSchema(orcSchema);
+ Writer orcFileWriter = OrcFile.createWriter(filePath, options);
+ orcFileWriter.addRowBatch(rowBatch);
+ orcFileWriter.close();
+
+ // Load it back and compare.
+ FileSystem fs = FileSystem.get(new Configuration());
+ List<Writable> orcRecords = deserializeOrcRecords(filePath, fs);
+
+ Assert.assertEquals(orcRecords.size(), 2);
+ Assert.assertEquals(orcRecords.get(0).toString(), "{3.4}");
+ Assert.assertEquals(orcRecords.get(1).toString(), "{5.97}");
+ }
+
+ @Test
public void testListResize()
throws Exception {
Schema schema =
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/data.json
b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/data.json
new file mode 100644
index 0000000..904f5c0
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/data.json
@@ -0,0 +1,6 @@
+{
+ "id": "\u0001\u0054"
+}
+{
+ "id": "\u0002\u0055"
+}
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/schema.avsc
b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/schema.avsc
new file mode 100644
index 0000000..a971da6
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/decimal_test/schema.avsc
@@ -0,0 +1,16 @@
+{
+ "namespace": "com.linkedin.decimal",
+ "type": "record",
+ "name": "DecimalTest",
+ "fields": [
+ {
+ "name": "id",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 10,
+ "scale": 2
+ }
+ }
+ ]
+}