wangxianghu commented on a change in pull request #2074:
URL: https://github.com/apache/hudi/pull/2074#discussion_r495515048
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -190,8 +192,16 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg,
SparkSession sparkSession, Sche
this.transformer =
UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
- this.formatAdapter = new SourceFormatAdapter(
- UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));
+ this.metrics = new
HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
Review comment:
IIUC, there is no need to init metrics if the source is not Kafka, so
init it when you really need it
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -190,8 +192,16 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg,
SparkSession sparkSession, Sche
this.transformer =
UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
- this.formatAdapter = new SourceFormatAdapter(
- UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));
+ this.metrics = new
HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
+
+ if
("org.apache.hudi.utilities.sources.JsonKafkaSource".equals(cfg.sourceClassName)
+ ||
"org.apache.hudi.utilities.sources.AvroKafkaSource".equals(cfg.sourceClassName))
{
Review comment:
Replace class name with `XXX.class.getName()` should be better
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
##########
@@ -242,7 +245,7 @@ public void testJsonKafkaSourceWithConfigurableUpperCap() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(500L, "earliest");
- Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider);
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting
sourceLimit
Review comment:
@liujinhui1994 thanks for your contribution, I left some comments you
can consider.
Besides, I have an idea here, it should be more elegant if you can init the
metric inside kafka source(json, avro) without breaking the constructor of
`org.apache.hudi.utilities.sources.Source` cc @yanghua WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]