This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e6cf5  Support nested types for recordKey, partitionPath and 
combineKey
a7e6cf5 is described below

commit a7e6cf51974f96c6b4bc945db25658cbb4bc48c7
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue May 14 19:59:04 2019 -0700

    Support nested types for recordKey, partitionPath and combineKey
---
 .../src/main/java/com/uber/hoodie/DataSourceUtils.java         | 10 +++++++++-
 .../hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java    |  2 +-
 .../hoodie/utilities/keygen/TimestampBasedKeyGenerator.java    |  4 ++--
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java 
b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index 19352ed..9eb8bcd 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -53,6 +53,14 @@ public class DataSourceUtils {
    * Obtain value of the provided field as string, denoted by dot notation. 
e.g: a.b.c
    */
   public static String getNestedFieldValAsString(GenericRecord record, String 
fieldName) {
+    Object obj = getNestedFieldVal(record, fieldName);
+    return (obj == null) ? null : obj.toString();
+  }
+
+  /**
+   * Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
+   */
+  public static Object getNestedFieldVal(GenericRecord record, String 
fieldName) {
     String[] parts = fieldName.split("\\.");
     GenericRecord valueNode = record;
     int i = 0;
@@ -65,7 +73,7 @@ public class DataSourceUtils {
 
       // return, if last part of name
       if (i == parts.length - 1) {
-        return val.toString();
+        return val;
       } else {
         // VC: Need a test here
         if (!(val instanceof GenericRecord)) {
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index acff58f..e00c923 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -243,7 +243,7 @@ public class HoodieDeltaStreamer implements Serializable {
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
     JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
       HoodieRecordPayload payload = 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-          (Comparable) gr.get(cfg.sourceOrderingField));
+          (Comparable) DataSourceUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField));
       return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
     });
 
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
index f6a0c90..b4c26b9 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
@@ -83,7 +83,7 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
 
   @Override
   public HoodieKey getKey(GenericRecord record) {
-    Object partitionVal = record.get(partitionPathField);
+    Object partitionVal = DataSourceUtils.getNestedFieldVal(record, 
partitionPathField);
     SimpleDateFormat partitionPathFormat = new 
SimpleDateFormat(outputDateFormat);
     partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
 
@@ -102,7 +102,7 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
             "Unexpected type for partition field: " + 
partitionVal.getClass().getName());
       }
 
-      return new HoodieKey(record.get(recordKeyField).toString(),
+      return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record, 
recordKeyField),
           partitionPathFormat.format(new Date(unixTime * 1000)));
     } catch (ParseException pe) {
       throw new HoodieDeltaStreamerException(

Reply via email to