This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 621c246  [HUDI-161] Remove --key-generator-class CLI arg in 
HoodieDeltaStreamer and use key generator class specified in datasource 
properties. (#781)
621c246 is described below

commit 621c246fa9ea607a3cd8f33fdc3d8b528315e327
Author: Yihua Guo <[email protected]>
AuthorDate: Fri Jul 12 13:45:49 2019 -0700

    [HUDI-161] Remove --key-generator-class CLI arg in HoodieDeltaStreamer and 
use key generator class specified in datasource properties. (#781)
---
 .../main/java/com/uber/hoodie/DataSourceUtils.java | 13 +++--
 .../com/uber/hoodie/HoodieSparkSqlWriter.scala     |  5 +-
 .../hoodie/utilities/deltastreamer/DeltaSync.java  |  2 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  6 ---
 .../hoodie/utilities/TestHoodieDeltaStreamer.java  | 58 +++++++++++++++++++---
 5 files changed, 62 insertions(+), 22 deletions(-)

diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java 
b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index e7b9494..d700ff6 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -90,10 +90,17 @@ public class DataSourceUtils {
   }
 
   /**
-   * Create a key generator class via reflection, passing in any configs needed
+   * Create a key generator class via reflection, passing in any configs 
needed.
+   *
+   * If the class name of key generator is configured through the properties 
file, i.e., {@code
+   * props}, use the corresponding key generator class; otherwise, use the 
default key generator
+   * class specified in {@code DataSourceWriteOptions}.
    */
-  public static KeyGenerator createKeyGenerator(String keyGeneratorClass,
-      TypedProperties props) throws IOException {
+  public static KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
+    String keyGeneratorClass = props.getString(
+        DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
+        DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL()
+    );
     try {
       return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, 
props);
     } catch (Throwable e) {
diff --git 
a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala 
b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
index cf44e09..414cad4 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
@@ -84,10 +84,7 @@ private[hoodie] object HoodieSparkSqlWriter {
     log.info(s"Registered avro schema : ${schema.toString(true)}")
 
     // Convert to RDD[HoodieRecord]
-    val keyGenerator = DataSourceUtils.createKeyGenerator(
-      parameters(KEYGENERATOR_CLASS_OPT_KEY),
-      toProperties(parameters)
-    )
+    val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
     val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, 
structName, nameSpace)
     val hoodieAllIncomingRecords = genericRecords.map(gr => {
       val orderingVal = DataSourceUtils.getNestedFieldValAsString(
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
index 89e5c73..00d270b 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
@@ -171,7 +171,7 @@ public class DeltaSync implements Serializable {
     refreshTimeline();
 
     this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
-    this.keyGenerator = 
DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
+    this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
 
     this.formatAdapter = new 
SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
         sparkSession, schemaProvider));
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index c49f3f8..1951546 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -27,7 +27,6 @@ import com.beust.jcommander.ParameterException;
 import com.google.common.base.Preconditions;
 import com.uber.hoodie.HoodieWriteClient;
 import com.uber.hoodie.OverwriteWithLatestAvroPayload;
-import com.uber.hoodie.SimpleKeyGenerator;
 import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -181,11 +180,6 @@ public class HoodieDeltaStreamer implements Serializable {
         + " to break ties between records with same key in input data. 
Default: 'ts' holding unix timestamp of record")
     public String sourceOrderingField = "ts";
 
-    @Parameter(names = {"--key-generator-class"}, description = "Subclass of 
com.uber.hoodie.KeyGenerator "
-        + "to generate a HoodieKey from the given avro record. Built in: 
SimpleKeyGenerator (uses "
-        + "provided field names as recordkey & partitionpath. Nested fields 
specified via dot notation, e.g: a.b.c)")
-    public String keyGeneratorClass = SimpleKeyGenerator.class.getName();
-
     @Parameter(names = {"--payload-class"}, description = "subclass of 
HoodieRecordPayload, that works off "
         + "a GenericRecord. Implement your own, if you want to do something 
other than overwriting existing value")
     public String payloadClassName = 
OverwriteWithLatestAvroPayload.class.getName();
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
index 68a5c8d..14b6c6a 100644
--- 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.uber.hoodie.DataSourceWriteOptions;
+import com.uber.hoodie.SimpleKeyGenerator;
 import com.uber.hoodie.common.model.HoodieCommitMetadata;
 import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -80,7 +81,8 @@ import org.junit.Test;
  * upserts, inserts. Check counts at the end.
  */
 public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
-
+  private static final String PROPS_FILENAME_TEST_SOURCE = 
"test-source.properties";
+  private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
   private static volatile Logger log = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
 
   @BeforeClass
@@ -96,6 +98,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
     TypedProperties props = new TypedProperties();
     props.setProperty("include", "sql-transformer.properties");
+    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestGenerator.class.getName());
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
     props.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
     
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source.avsc");
@@ -108,7 +111,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), 
"datestr");
     
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
         MultiPartKeysValueExtractor.class.getName());
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + 
"/test-source.properties");
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE);
 
     // Properties used for the delta-streamer which incrementally pulls from 
upstream Hudi source table and writes to
     // downstream hudi table
@@ -122,6 +125,17 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
     UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
         dfsBasePath + "/test-downstream-source.properties");
+
+    // Properties used for testing invalid key generator
+    TypedProperties invalidProps = new TypedProperties();
+    invalidProps.setProperty("include", "sql-transformer.properties");
+    invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", 
"invalid");
+    invalidProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+    invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
+    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
+    UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs,
+        dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
   }
 
   @AfterClass
@@ -147,11 +161,11 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     }
 
     static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String transformerClassName) {
-      return makeConfig(basePath, op, transformerClassName, false);
+      return makeConfig(basePath, op, transformerClassName, 
PROPS_FILENAME_TEST_SOURCE, false);
     }
 
     static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String transformerClassName,
-        boolean enableHiveSync) {
+        String propsFilename, boolean enableHiveSync) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
@@ -161,7 +175,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       cfg.operation = op;
       cfg.enableHiveSync = enableHiveSync;
       cfg.sourceOrderingField = "timestamp";
-      cfg.propsFilePath = dfsBasePath + "/test-source.properties";
+      cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
       cfg.sourceLimit = 1000;
       cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
       return cfg;
@@ -259,10 +273,31 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
   @Test
   public void testProps() throws IOException {
-    TypedProperties props = new DFSPropertiesConfiguration(dfs, new 
Path(dfsBasePath + "/test-source.properties"))
-        .getConfig();
+    TypedProperties props = new DFSPropertiesConfiguration(
+        dfs, new Path(dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE)).getConfig();
     assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
     assertEquals("_row_key", 
props.getString("hoodie.datasource.write.recordkey.field"));
+    assertEquals(
+        "com.uber.hoodie.utilities.TestHoodieDeltaStreamer$TestGenerator",
+        props.getString("hoodie.datasource.write.keygenerator.class")
+    );
+  }
+
+  @Test
+  public void testPropsWithInvalidKeyGenerator() throws Exception {
+    try {
+      String datasetBasePath = dfsBasePath + "/test_dataset";
+      HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+          TestHelpers.makeConfig(
+              datasetBasePath, Operation.BULK_INSERT, 
TripsWithDistanceTransformer.class.getName(),
+              PROPS_FILENAME_TEST_INVALID, false), jsc);
+      deltaStreamer.sync();
+      fail("Should error out when setting the key generator class property to 
an invalid value");
+    } catch (IOException e) {
+      //expected
+      log.error("Expected error during getting the key generator", e);
+      assertTrue(e.getMessage().contains("Could not load key generator 
class"));
+    }
   }
 
   @Test
@@ -370,7 +405,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
     // Initial bulk insert to ingest to first hudi table
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, 
Operation.BULK_INSERT,
-        SqlQueryBasedTransformer.class.getName(), true);
+        SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, 
true);
     new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", 
sqlContext);
     TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", 
sqlContext);
@@ -524,4 +559,11 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
               functions.col("end_lat"), functions.col("begin_lon"), 
functions.col("end_lat")));
     }
   }
+
+  public static class TestGenerator extends SimpleKeyGenerator {
+
+    public TestGenerator(TypedProperties props) {
+      super(props);
+    }
+  }
 }

Reply via email to