vinothchandar commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r464805558
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -267,9 +267,16 @@ public Operation convert(String value) throws
ParameterException {
description = "Should duplicate records from source be
dropped/filtered out before insert/bulk-insert")
public Boolean filterDupes = false;
+ //will abandon in the future version, recommended use --enable-sync
Review comment:
can we print a warning around this, so the user knows?
here's my take. we can change the code so that --enable-sync and
--sync-tool-class-list are the main drivers out of which we derive a
`Set<String>` denoting all the sync tool classes. if --enable-hive-sync is
specified, then we simply add the hive sync tool class to this set.. rest of
the code just syncs to all sync tools in this set.
this way, `--enable-hive-sync` will be just isolated to the initial command
line parsing code. We can apply the same method to datasource as well, if you
don't see issues @lw309637554 @leesf wdyt?
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,43 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig
}
+ private def metaSync(parameters: Map[String, String],
+ basePath: Path,
+ hadoopConf: Configuration): Boolean = {
+ val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r
=> r.toBoolean)
+ var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r
=> r.toBoolean)
+ var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+ // for backward compatibility
+ if (hiveSyncEnabled) {
+ metaSyncEnabled = true
+ syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+ }
+ var metaSyncSuccess = true
+ if (metaSyncEnabled) {
+ val impls = syncClientToolClass.split(",")
+ impls.foreach(impl => {
+ val syncSuccess = impl.trim match {
+ case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+ log.info("Syncing to Hive Metastore (URL: " +
parameters(HIVE_URL_OPT_KEY) + ")")
+ val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+ syncHive(basePath, fs, parameters)
+ }
+ case _ => {
+ val fs = FSUtils.getFs(basePath.toString, hadoopConf)
Review comment:
can this line be shared . the `fs` initialization?
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,43 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig
}
+ private def metaSync(parameters: Map[String, String],
+ basePath: Path,
+ hadoopConf: Configuration): Boolean = {
+ val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r
=> r.toBoolean)
+ var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r
=> r.toBoolean)
+ var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+ // for backward compatibility
+ if (hiveSyncEnabled) {
+ metaSyncEnabled = true
+ syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+ }
+ var metaSyncSuccess = true
+ if (metaSyncEnabled) {
+ val impls = syncClientToolClass.split(",")
+ impls.foreach(impl => {
+ val syncSuccess = impl.trim match {
+ case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+ log.info("Syncing to Hive Metastore (URL: " +
parameters(HIVE_URL_OPT_KEY) + ")")
Review comment:
can we explicitly match for Hive instead of default? we may change the
default for e.g and it would be an issue.
##########
File path: packaging/hudi-hive-sync-bundle/pom.xml
##########
@@ -66,7 +66,8 @@
<includes>
<include>org.apache.hudi:hudi-common</include>
<include>org.apache.hudi:hudi-hadoop-mr</include>
- <include>org.apache.hudi:hudi-hive-sync</include>
+ <include>org.apache.hudi:hudi-sync-common</include>
+ <include>org.apache.hudi:hudi-hive-sync</include>
Review comment:
nit:indent
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -442,7 +449,8 @@ private void refreshTimeline() throws IOException {
long overallTimeMs = overallTimerContext != null ?
overallTimerContext.stop() : 0;
// Send DeltaStreamer Metrics
- metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
+ metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs, true);
+ metrics.updateDeltaStreamerMetrics(overallTimeMs, metaSyncTimeMs, false);
Review comment:
is there a way to do this by iterating over the configured sync tool
classes? i.e only do it when sync is configured?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s", tableName,
action, metric);
}
- public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+ public void updateDeltaStreamerMetrics(long durationInNs, long syncNs,
boolean hiveSync) {
if (config.isMetricsOn()) {
Metrics.registerGauge(getMetricsName("deltastreamer", "duration"),
getDurationInMs(durationInNs));
- Metrics.registerGauge(getMetricsName("deltastreamer",
"hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+ if (hiveSync) {
+ Metrics.registerGauge(getMetricsName("deltastreamer",
"hiveSyncDuration"), getDurationInMs(syncNs));
+ } else {
+ Metrics.registerGauge(getMetricsName("deltastreamer",
"metaSyncDuration"), getDurationInMs(syncNs));
Review comment:
what if both hive and meta sync are off? we would still emit metrics for
meta?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws
ParameterException {
@Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing
to hive")
public Boolean enableHiveSync = false;
+ @Parameter(names = {"--hoodie-sync-client-tool-class"}, description =
"Meta sync client tool, using comma to separate multi tools")
Review comment:
I think we can name this little shorter. `--sync-tool-class-list` ?
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig
}
+ private def metaSync(parameters: Map[String, String],
+ basePath: Path,
+ hadoopConf: Configuration): Boolean = {
+ val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r
=> r.toBoolean)
+ var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r
=> r.toBoolean)
+ var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+ // for backward compatibility
+ if (hiveSyncEnabled) {
+ metaSyncEnabled = true
+ syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
Review comment:
would users sync to both? down the line it may make sense to provide
support for syncing to multiple things.
but even here, if we just append the HiveSync class when
`hiveSyncEnabled=true`, we can support syncing to both Hive and dla?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s", tableName,
action, metric);
}
- public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+ public void updateDeltaStreamerMetrics(long durationInNs, long syncNs,
boolean hiveSync) {
if (config.isMetricsOn()) {
Metrics.registerGauge(getMetricsName("deltastreamer", "duration"),
getDurationInMs(durationInNs));
- Metrics.registerGauge(getMetricsName("deltastreamer",
"hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+ if (hiveSync) {
+ Metrics.registerGauge(getMetricsName("deltastreamer",
"hiveSyncDuration"), getDurationInMs(syncNs));
+ } else {
+ Metrics.registerGauge(getMetricsName("deltastreamer",
"metaSyncDuration"), getDurationInMs(syncNs));
Review comment:
should we derive the metric name from the sync tool class. i.e instead
of `metaSyncDuration`, we do `dlaSyncDuration`? that seems more usable and
understandable
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]