This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 245e1fd [HUDI-2272] Pass base file format to sync clients (#3397)
245e1fd is described below
commit 245e1fd17dbf19d9008eaae3bc009bc2be2e663f
Author: rmahindra123 <[email protected]>
AuthorDate: Tue Aug 3 14:46:02 2021 -0700
[HUDI-2272] Pass base file format to sync clients (#3397)
Co-authored-by: Rajesh Mahindra <[email protected]>
---
.../org/apache/hudi/utilities/deltastreamer/DeltaSync.java | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a08293d..e56082c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -574,22 +574,19 @@ public class DeltaSync implements Serializable {
for (String impl : syncClientToolClasses) {
Timer.Context syncContext = metrics.getMetaSyncTimerContext();
impl = impl.trim();
- AbstractSyncTool syncTool = null;
switch (impl) {
case "org.apache.hudi.hive.HiveSyncTool":
- HiveSyncConfig hiveSyncConfig =
DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath,
cfg.baseFileFormat);
- LOG.info("Syncing target hoodie table with hive table(" +
hiveSyncConfig.tableName + "). Hive metastore URL :"
- + hiveSyncConfig.jdbcUrl + ", basePath :" +
cfg.targetBasePath);
- syncTool = new HiveSyncTool(hiveSyncConfig, new HiveConf(conf,
HiveConf.class), fs);
+ syncHive();
break;
default:
FileSystem fs = FSUtils.getFs(cfg.targetBasePath,
jssc.hadoopConfiguration());
Properties properties = new Properties();
properties.putAll(props);
properties.put("basePath", cfg.targetBasePath);
- syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new
Class[]{Properties.class, FileSystem.class}, properties, fs);
+ properties.put("baseFileFormat", cfg.baseFileFormat);
+ AbstractSyncTool syncTool = (AbstractSyncTool)
ReflectionUtils.loadClass(impl, new Class[]{Properties.class,
FileSystem.class}, properties, fs);
+ syncTool.syncHoodieTable();
}
- syncTool.syncHoodieTable();
long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl),
metaSyncTimeMs);
}