wangxianghu commented on a change in pull request #2074:
URL: https://github.com/apache/hudi/pull/2074#discussion_r495515048



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -190,8 +192,16 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, 
SparkSession sparkSession, Sche
     this.transformer = 
UtilHelpers.createTransformer(cfg.transformerClassNames);
     this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
 
-    this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider));
+    this.metrics = new 
HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));

Review comment:
       IIUC, there is no need to init metrics if the source is not Kafka, so 
init it when you really need it 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -190,8 +192,16 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, 
SparkSession sparkSession, Sche
     this.transformer = 
UtilHelpers.createTransformer(cfg.transformerClassNames);
     this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
 
-    this.formatAdapter = new SourceFormatAdapter(
-        UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider));
+    this.metrics = new 
HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
+
+    if 
("org.apache.hudi.utilities.sources.JsonKafkaSource".equals(cfg.sourceClassName)
+            || 
"org.apache.hudi.utilities.sources.AvroKafkaSource".equals(cfg.sourceClassName))
 {

Review comment:
       Replace class name  with `XXX.class.getName()` should be better

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
##########
@@ -242,7 +245,7 @@ public void testJsonKafkaSourceWithConfigurableUpperCap() {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     TypedProperties props = createPropsForJsonSource(500L, "earliest");
 
-    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider);
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
 
     // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit

Review comment:
       @liujinhui1994 thanks for your contribution, I left some comments you 
can consider. 
   Besides, I have an idea here, it should be more elegant if you can init the 
metric inside kafka source(json, avro) without breaking the constructor of 
`org.apache.hudi.utilities.sources.Source` cc @yanghua WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to