This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89e37d5  [HUDI-908] Add some data types to HoodieTestDataGenerator and 
fix some some bugs. (#1690)
89e37d5 is described below

commit 89e37d5273ea1c6bf2fe3a8f7053e7a3cc44011d
Author: Shen Hong <[email protected]>
AuthorDate: Mon Jun 22 23:13:28 2020 +0800

    [HUDI-908] Add some data types to HoodieTestDataGenerator and fix some some 
bugs. (#1690)
---
 .../hudi/client/TestTableSchemaEvolution.java      | 13 ++++----
 .../hudi/testutils/HoodieTestDataGenerator.java    | 34 ++++++++++++++++++--
 .../apache/hudi/avro/MercifulJsonConverter.java    | 14 ++++++--
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  3 +-
 .../org/apache/hudi/AvroConversionHelper.scala     |  6 +++-
 .../resources/delta-streamer-config/source.avsc    | 37 ++++++++++++++++++++--
 .../sql-transformer.properties                     |  2 +-
 .../resources/delta-streamer-config/target.avsc    | 34 ++++++++++++++++++++
 8 files changed, 127 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 8e56b31..6a93e31 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1;
+import static 
org.apache.hudi.testutils.HoodieTestDataGenerator.EXTRA_TYPE_SCHEMA;
 import static 
org.apache.hudi.testutils.HoodieTestDataGenerator.FARE_NESTED_SCHEMA;
 import static 
org.apache.hudi.testutils.HoodieTestDataGenerator.MAP_TYPE_SCHEMA;
 import static 
org.apache.hudi.testutils.HoodieTestDataGenerator.TIP_NESTED_SCHEMA;
@@ -68,19 +69,19 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
       "{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},";
 
   // TRIP_EXAMPLE_SCHEMA with a new_field added
-  public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX 
+ MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
-      + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
+  public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX 
+ EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+      + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + 
TRIP_SCHEMA_SUFFIX;
 
   // TRIP_EXAMPLE_SCHEMA with tip field removed
-  public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX 
+ MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
-      + TRIP_SCHEMA_SUFFIX;
+  public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX 
+ EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+      + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
 
   @Test
   public void testSchemaCompatibilityBasic() throws Exception {
     assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA),
         "Same schema is compatible");
 
-    String reorderedSchema = TRIP_SCHEMA_PREFIX + TIP_NESTED_SCHEMA + 
FARE_NESTED_SCHEMA
+    String reorderedSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + 
TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA
         + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
     assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
reorderedSchema),
         "Reordered fields are compatible");
@@ -114,7 +115,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED),
         "Added field with default is compatible (Evolved Schema)");
 
-    String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + 
FARE_NESTED_SCHEMA
+    String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + 
MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
         + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + 
EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
         + TRIP_SCHEMA_SUFFIX;
     assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
multipleAddedFieldSchema),
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
index a6de0f5..0eb88e2 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.fs.FSUtils;
@@ -46,7 +49,10 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -94,8 +100,16 @@ public class HoodieTestDataGenerator {
   public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", 
\"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": 
\"tip_history\", \"fields\": ["
       + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": 
\"currency\", \"type\": \"string\"}]}}},";
   public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", 
\"type\": {\"type\": \"map\", \"values\": \"string\"}},";
+  public static final String EXTRA_TYPE_SCHEMA = "{\"name\": 
\"distance_in_meters\", \"type\": \"int\"},"
+      + "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},"
+      + "{\"name\": \"weight\", \"type\": \"float\"},"
+      + "{\"name\": \"nation\", \"type\": \"bytes\"},"
+      + "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", 
\"logicalType\": \"date\"}},"
+      + "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\", 
\"logicalType\": \"timestamp-micros\"}},"
+      + 
"{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
+
   public static final String TRIP_EXAMPLE_SCHEMA =
-      TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + 
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+      TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + 
FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
   public static final String TRIP_FLATTENED_SCHEMA =
       TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
 
@@ -107,7 +121,7 @@ public class HoodieTestDataGenerator {
       + 
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
 \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
 
   public static final String NULL_SCHEMA = 
Schema.create(Schema.Type.NULL).toString();
-  public static final String TRIP_HIVE_COLUMN_TYPES = 
"double,string,string,string,double,double,double,double,"
+  public static final String TRIP_HIVE_COLUMN_TYPES = 
"double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
       + 
"map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
 
   public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
@@ -245,6 +259,22 @@ public class HoodieTestDataGenerator {
       rec.put("fare", RAND.nextDouble() * 100);
       rec.put("currency", "USD");
     } else {
+      rec.put("distance_in_meters", RAND.nextInt());
+      rec.put("seconds_since_epoch", RAND.nextLong());
+      rec.put("weight", RAND.nextFloat());
+      byte[] bytes = "Canada".getBytes();
+      rec.put("nation", ByteBuffer.wrap(bytes));
+      long currentTimeMillis = System.currentTimeMillis();
+      Date date = new Date(currentTimeMillis);
+      rec.put("current_date", (int) date.toLocalDate().toEpochDay());
+      rec.put("current_ts", currentTimeMillis);
+
+      BigDecimal bigDecimal = new BigDecimal(String.format("%5f", 
RAND.nextFloat()));
+      Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
+      Conversions.DecimalConversion decimalConversions = new 
Conversions.DecimalConversion();
+      GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, 
decimalSchema, LogicalTypes.decimal(10, 6));
+      rec.put("height", genericFixed);
+
       rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
 
       GenericRecord fareRecord = new 
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
index 734c631..d759a8d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
@@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -236,7 +237,8 @@ public class MercifulJsonConverter {
     return new JsonToAvroFieldProcessor() {
       @Override
       public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
-        return Pair.of(true, value.toString().getBytes());
+        // Should return ByteBuffer (see GenericData.isBytes())
+        return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
       }
     };
   }
@@ -245,10 +247,16 @@ public class MercifulJsonConverter {
     return new JsonToAvroFieldProcessor() {
       @Override
       public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
-        byte[] src = value.toString().getBytes();
+        // The ObjectMapper use List to represent FixedType
+        // eg: "decimal_val": [0, 0, 14, -63, -52] will convert to 
ArrayList<Integer>
+        List<Integer> converval = (List<Integer>) value;
+        byte[] src = new byte[converval.size()];
+        for (int i = 0; i < converval.size(); i++) {
+          src[i] = converval.get(i).byteValue();
+        }
         byte[] dst = new byte[schema.getFixedSize()];
         System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), 
src.length));
-        return Pair.of(true, dst);
+        return Pair.of(true, new GenericData.Fixed(schema, dst));
       }
     };
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 6af3770..cd876b4 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -44,6 +44,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
@@ -146,7 +147,7 @@ public class HoodieRealtimeRecordReaderUtils {
       case STRING:
         return new Text(value.toString());
       case BYTES:
-        return new BytesWritable((byte[]) value);
+        return new BytesWritable(((ByteBuffer)value).array());
       case INT:
         return new IntWritable((Integer) value);
       case LONG:
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index 69e6376..259f51f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -110,7 +110,11 @@ object AvroConversionHelper {
             if (item == null) {
               null
             } else {
-              new Date(item.asInstanceOf[Long])
+              if (item.isInstanceOf[Integer]) {
+                new Date(item.asInstanceOf[Integer].longValue())
+              } else {
+                new Date(item.asInstanceOf[Long])
+              }
             }
         case (TimestampType, LONG) =>
           (item: AnyRef) =>
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc 
b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
index 8d01820..f5cc97f 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
@@ -43,8 +43,41 @@
   }, {
     "name" : "end_lon",
     "type" : "double"
-  },
-  {
+  }, {
+    "name" : "distance_in_meters",
+    "type" : "int"
+  }, {
+    "name" : "seconds_since_epoch",
+    "type" : "long"
+  }, {
+    "name" : "weight",
+    "type" : "float"
+  },{
+    "name" : "nation",
+    "type" : "bytes"
+  },{
+    "name" : "current_date",
+    "type" : {
+      "type" : "int",
+      "logicalType" : "date"
+      }
+  },{
+    "name" : "current_ts",
+    "type" : {
+      "type" : "long",
+      "logicalType" : "timestamp-micros"
+      }
+  },{
+    "name" : "height",
+    "type" : {
+      "type" : "fixed",
+      "name" : "abc",
+      "size" : 5,
+      "logicalType" : "decimal",
+      "precision" : 10,
+      "scale": 6
+      }
+  }, {
     "name" :"city_to_state",
     "type" : {
       "type" : "map",
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
 
b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
index 569b417..dc735e8 100644
--- 
a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
 # limitations under the License.
 ###
 include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, 
a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.city_to_state, 
a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS 
haversine_distance FROM <SRC> a
+hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, 
a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, 
a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, 
a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, 
CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc 
b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
index 4fbb5c5..a026107 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
@@ -44,6 +44,40 @@
     "name" : "end_lon",
     "type" : "double"
   }, {
+    "name" : "distance_in_meters",
+    "type" : "int"
+  }, {
+    "name" : "seconds_since_epoch",
+    "type" : "long"
+  }, {
+    "name" : "weight",
+    "type" : "float"
+  }, {
+    "name" : "nation",
+    "type" : "bytes"
+  },{
+    "name" : "current_date",
+    "type" : {
+      "type" : "int",
+      "logicalType" : "date"
+      }
+  },{
+    "name" : "current_ts",
+    "type" : {
+      "type" : "long",
+      "logicalType" : "timestamp-micros"
+      }
+  }, {
+    "name" : "height",
+    "type" : {
+      "type" : "fixed",
+      "name" : "abc",
+      "size" : 5,
+      "logicalType" : "decimal",
+      "precision" : 10,
+      "scale": 6
+      }
+  }, {
     "name" :"city_to_state",
     "type" : {
       "type" : "map",

Reply via email to