This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 621c246 [HUDI-161] Remove --key-generator-class CLI arg in
HoodieDeltaStreamer and use key generator class specified in datasource
properties. (#781)
621c246 is described below
commit 621c246fa9ea607a3cd8f33fdc3d8b528315e327
Author: Yihua Guo <[email protected]>
AuthorDate: Fri Jul 12 13:45:49 2019 -0700
[HUDI-161] Remove --key-generator-class CLI arg in HoodieDeltaStreamer and
use key generator class specified in datasource properties. (#781)
---
.../main/java/com/uber/hoodie/DataSourceUtils.java | 13 +++--
.../com/uber/hoodie/HoodieSparkSqlWriter.scala | 5 +-
.../hoodie/utilities/deltastreamer/DeltaSync.java | 2 +-
.../deltastreamer/HoodieDeltaStreamer.java | 6 ---
.../hoodie/utilities/TestHoodieDeltaStreamer.java | 58 +++++++++++++++++++---
5 files changed, 62 insertions(+), 22 deletions(-)
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index e7b9494..d700ff6 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -90,10 +90,17 @@ public class DataSourceUtils {
}
/**
- * Create a key generator class via reflection, passing in any configs needed
+ * Create a key generator class via reflection, passing in any configs
needed.
+ *
+ * If the class name of key generator is configured through the properties
file, i.e., {@code
+ * props}, use the corresponding key generator class; otherwise, use the
default key generator
+ * class specified in {@code DataSourceWriteOptions}.
*/
- public static KeyGenerator createKeyGenerator(String keyGeneratorClass,
- TypedProperties props) throws IOException {
+ public static KeyGenerator createKeyGenerator(TypedProperties props) throws
IOException {
+ String keyGeneratorClass = props.getString(
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
+ DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL()
+ );
try {
return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass,
props);
} catch (Throwable e) {
diff --git
a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
index cf44e09..414cad4 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
@@ -84,10 +84,7 @@ private[hoodie] object HoodieSparkSqlWriter {
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord]
- val keyGenerator = DataSourceUtils.createKeyGenerator(
- parameters(KEYGENERATOR_CLASS_OPT_KEY),
- toProperties(parameters)
- )
+ val keyGenerator =
DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df,
structName, nameSpace)
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val orderingVal = DataSourceUtils.getNestedFieldValAsString(
diff --git
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
index 89e5c73..00d270b 100644
---
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
+++
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
@@ -171,7 +171,7 @@ public class DeltaSync implements Serializable {
refreshTimeline();
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
- this.keyGenerator =
DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
+ this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
this.formatAdapter = new
SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));
diff --git
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index c49f3f8..1951546 100644
---
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -27,7 +27,6 @@ import com.beust.jcommander.ParameterException;
import com.google.common.base.Preconditions;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.OverwriteWithLatestAvroPayload;
-import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -181,11 +180,6 @@ public class HoodieDeltaStreamer implements Serializable {
+ " to break ties between records with same key in input data.
Default: 'ts' holding unix timestamp of record")
public String sourceOrderingField = "ts";
- @Parameter(names = {"--key-generator-class"}, description = "Subclass of
com.uber.hoodie.KeyGenerator "
- + "to generate a HoodieKey from the given avro record. Built in:
SimpleKeyGenerator (uses "
- + "provided field names as recordkey & partitionpath. Nested fields
specified via dot notation, e.g: a.b.c)")
- public String keyGeneratorClass = SimpleKeyGenerator.class.getName();
-
@Parameter(names = {"--payload-class"}, description = "subclass of
HoodieRecordPayload, that works off "
+ "a GenericRecord. Implement your own, if you want to do something
other than overwriting existing value")
public String payloadClassName =
OverwriteWithLatestAvroPayload.class.getName();
diff --git
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
index 68a5c8d..14b6c6a 100644
---
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
+++
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.uber.hoodie.DataSourceWriteOptions;
+import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -80,7 +81,8 @@ import org.junit.Test;
* upserts, inserts. Check counts at the end.
*/
public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
-
+ private static final String PROPS_FILENAME_TEST_SOURCE =
"test-source.properties";
+ private static final String PROPS_FILENAME_TEST_INVALID =
"test-invalid.properties";
private static volatile Logger log =
LogManager.getLogger(TestHoodieDeltaStreamer.class);
@BeforeClass
@@ -96,6 +98,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties");
+ props.setProperty("hoodie.datasource.write.keygenerator.class",
TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
@@ -108,7 +111,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(),
"datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName());
- UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath +
"/test-source.properties");
+ UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE);
// Properties used for the delta-streamer which incrementally pulls from
upstream Hudi source table and writes to
// downstream hudi table
@@ -122,6 +125,17 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
dfsBasePath + "/test-downstream-source.properties");
+
+ // Properties used for testing invalid key generator
+ TypedProperties invalidProps = new TypedProperties();
+ invalidProps.setProperty("include", "sql-transformer.properties");
+ invalidProps.setProperty("hoodie.datasource.write.keygenerator.class",
"invalid");
+ invalidProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
+ invalidProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
+
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+ UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs,
+ dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
}
@AfterClass
@@ -147,11 +161,11 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation
op, String transformerClassName) {
- return makeConfig(basePath, op, transformerClassName, false);
+ return makeConfig(basePath, op, transformerClassName,
PROPS_FILENAME_TEST_SOURCE, false);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation
op, String transformerClassName,
- boolean enableHiveSync) {
+ String propsFilename, boolean enableHiveSync) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
@@ -161,7 +175,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = "timestamp";
- cfg.propsFilePath = dfsBasePath + "/test-source.properties";
+ cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = 1000;
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
return cfg;
@@ -259,10 +273,31 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
@Test
public void testProps() throws IOException {
- TypedProperties props = new DFSPropertiesConfiguration(dfs, new
Path(dfsBasePath + "/test-source.properties"))
- .getConfig();
+ TypedProperties props = new DFSPropertiesConfiguration(
+ dfs, new Path(dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE)).getConfig();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key",
props.getString("hoodie.datasource.write.recordkey.field"));
+ assertEquals(
+ "com.uber.hoodie.utilities.TestHoodieDeltaStreamer$TestGenerator",
+ props.getString("hoodie.datasource.write.keygenerator.class")
+ );
+ }
+
+ @Test
+ public void testPropsWithInvalidKeyGenerator() throws Exception {
+ try {
+ String datasetBasePath = dfsBasePath + "/test_dataset";
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(
+ datasetBasePath, Operation.BULK_INSERT,
TripsWithDistanceTransformer.class.getName(),
+ PROPS_FILENAME_TEST_INVALID, false), jsc);
+ deltaStreamer.sync();
+ fail("Should error out when setting the key generator class property to
an invalid value");
+ } catch (IOException e) {
+ //expected
+ log.error("Expected error during getting the key generator", e);
+ assertTrue(e.getMessage().contains("Could not load key generator
class"));
+ }
}
@Test
@@ -370,7 +405,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath,
Operation.BULK_INSERT,
- SqlQueryBasedTransformer.class.getName(), true);
+ SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE,
true);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet",
sqlContext);
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet",
sqlContext);
@@ -524,4 +559,11 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
functions.col("end_lat"), functions.col("begin_lon"),
functions.col("end_lat")));
}
}
+
+ public static class TestGenerator extends SimpleKeyGenerator {
+
+ public TestGenerator(TypedProperties props) {
+ super(props);
+ }
+ }
}