vinothchandar commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r464805558



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -267,9 +267,16 @@ public Operation convert(String value) throws 
ParameterException {
         description = "Should duplicate records from source be 
dropped/filtered out before insert/bulk-insert")
     public Boolean filterDupes = false;
 
+    //will abandon in the future version, recommended use --enable-sync

Review comment:
       can we print a warning around this, so the user knows? 
   
   here's my take. we can change the code so that --enable-sync and 
--sync-tool-class-list are the main drivers out of which we derive a 
`Set<String>` denoting all the sync tool classes. if --enable-hive-sync is 
specified, then we simply add the hive sync tool class to this set.. rest of 
the code just syncs to all sync tools in this set. 
   
   this way, `--enable-hive-sync` will be just isolated to the initial command 
line parsing code. We can apply the same method to datasource as well, if you 
don't see issues  @lw309637554 @leesf wdyt? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+    }
+    var metaSyncSuccess = true
+    if (metaSyncEnabled) {
+      val impls = syncClientToolClass.split(",")
+      impls.foreach(impl => {
+        val syncSuccess = impl.trim match {
+          case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+            log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+            syncHive(basePath, fs, parameters)
+          }
+          case _ => {
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)

Review comment:
       can this line be shared . the `fs` initialization?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+    }
+    var metaSyncSuccess = true
+    if (metaSyncEnabled) {
+      val impls = syncClientToolClass.split(",")
+      impls.foreach(impl => {
+        val syncSuccess = impl.trim match {
+          case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+            log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")

Review comment:
       can we explicitly match for Hive instead of default? we may change the 
default for e.g and it would be an issue. 

##########
File path: packaging/hudi-hive-sync-bundle/pom.xml
##########
@@ -66,7 +66,8 @@
                 <includes>
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
-                  <include>org.apache.hudi:hudi-hive-sync</include>
+                  <include>org.apache.hudi:hudi-sync-common</include>
+                 <include>org.apache.hudi:hudi-hive-sync</include>

Review comment:
       nit:indent

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -442,7 +449,8 @@ private void refreshTimeline() throws IOException {
     long overallTimeMs = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
 
     // Send DeltaStreamer Metrics
-    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
+    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs, true);
+    metrics.updateDeltaStreamerMetrics(overallTimeMs, metaSyncTimeMs, false);

Review comment:
       is there a way to do this by iterating over the configured sync tool 
classes? i.e only do it when sync is configured?  

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, 
action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, 
boolean hiveSync) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), 
getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+      if (hiveSync) {
+        Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(syncNs));
+      } else {
+        Metrics.registerGauge(getMetricsName("deltastreamer", 
"metaSyncDuration"), getDurationInMs(syncNs));

Review comment:
       what if both hive and meta sync are off? we would still emit metrics for 
meta? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws 
ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = 
"Meta sync client tool, using comma to separate multi tools")

Review comment:
       I think we can name this little shorter. `--sync-tool-class-list` ? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS

Review comment:
       would users sync to both?  down the line it may make sense to provide 
support for syncing to multiple things. 
   
   but even here, if we just append the HiveSync class when 
`hiveSyncEnabled=true`, we can support syncing to both Hive and dla? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, 
action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, 
boolean hiveSync) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), 
getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+      if (hiveSync) {
+        Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(syncNs));
+      } else {
+        Metrics.registerGauge(getMetricsName("deltastreamer", 
"metaSyncDuration"), getDurationInMs(syncNs));

Review comment:
       should we derive the metric name from the sync tool class. i.e instead 
of `metaSyncDuration`, we do `dlaSyncDuration`?  that seems more usable and 
understandable 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to