This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/redo-log by this push:
     new b98e931  [HUDI-463] Redo hudi-utilities log statements using SLF4J 
(#1177)
b98e931 is described below

commit b98e93179b33e4d837ee0fb41b4fa59c6e1c0e76
Author: ForwardXu <forwardxu...@gmail.com>
AuthorDate: Mon Jan 6 22:47:01 2020 +0800

    [HUDI-463] Redo hudi-utilities log statements using SLF4J (#1177)
---
 hudi-utilities/pom.xml                             |  5 +++
 .../apache/hudi/utilities/HDFSParquetImporter.java | 15 +++----
 .../hudi/utilities/HiveIncrementalPuller.java      | 47 +++++++++++-----------
 .../org/apache/hudi/utilities/HoodieCleaner.java   |  8 ++--
 .../org/apache/hudi/utilities/HoodieCompactor.java |  8 ++--
 .../hudi/utilities/HoodieSnapshotCopier.java       |  6 +--
 .../org/apache/hudi/utilities/UtilHelpers.java     |  8 ++--
 .../adhoc/UpgradePayloadFromUberToApache.java      | 32 ++++++++-------
 .../AbstractDeltaStreamerService.java              |  7 ++--
 .../hudi/utilities/deltastreamer/Compactor.java    | 11 ++---
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 41 ++++++++++---------
 .../deltastreamer/HoodieDeltaStreamer.java         | 23 ++++++-----
 .../deltastreamer/SchedulerConfGenerator.java      |  8 ++--
 .../hudi/utilities/perf/TimelineServerPerf.java    |  8 ++--
 .../hudi/utilities/sources/AvroKafkaSource.java    |  8 ++--
 .../hudi/utilities/sources/HiveIncrPullSource.java |  8 ++--
 .../hudi/utilities/sources/HoodieIncrSource.java   |  8 ++--
 .../hudi/utilities/sources/JsonKafkaSource.java    |  8 ++--
 .../org/apache/hudi/utilities/sources/Source.java  |  6 +--
 .../utilities/sources/helpers/KafkaOffsetGen.java  |  5 ---
 .../utilities/transform/FlatteningTransformer.java |  8 ++--
 .../transform/SqlQueryBasedTransformer.java        | 10 ++---
 .../hudi/utilities/TestHoodieDeltaStreamer.java    | 12 +++---
 .../utilities/sources/AbstractBaseTestSource.java  |  6 +--
 .../sources/DistributedTestDataSource.java         |  6 +--
 .../hudi/utilities/sources/TestDataSource.java     |  6 +--
 26 files changed, 165 insertions(+), 153 deletions(-)

diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index ba59e22..00c6ecb 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -124,6 +124,11 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
 
     <!-- Fasterxml -->
     <dependency>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 4aa72d0..8b141a2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.spark.api.java.JavaRDD;
@@ -59,6 +57,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -66,7 +66,7 @@ import scala.Tuple2;
  */
 public class HDFSParquetImporter implements Serializable {
 
-  private static final Logger LOG = 
LogManager.getLogger(HDFSParquetImporter.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HDFSParquetImporter.class);
 
   private static final DateTimeFormatter PARTITION_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy/MM/dd")
       .withZone(ZoneId.systemDefault());
@@ -103,7 +103,7 @@ public class HDFSParquetImporter implements Serializable {
     this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
     this.props = cfg.propsFilePath == null ? 
UtilHelpers.buildProperties(cfg.configs)
         : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
-    LOG.info("Starting data import with configs : " + props.toString());
+    LOG.info("Starting data import with configs : {}", props.toString());
     int ret = -1;
     try {
       // Verify that targetPath is not present.
@@ -114,7 +114,7 @@ public class HDFSParquetImporter implements Serializable {
         ret = dataImport(jsc);
       } while (ret != 0 && retry-- > 0);
     } catch (Throwable t) {
-      LOG.error(t);
+      LOG.error("The dataImport error:", t);
     }
     return ret;
   }
@@ -175,13 +175,14 @@ public class HDFSParquetImporter implements Serializable {
             throw new HoodieIOException("row field is missing. :" + 
cfg.rowKey);
           }
           String partitionPath = partitionField.toString();
-          LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
+          LOG.debug("Row Key : {}, Partition Path is ({})", rowField, 
partitionPath);
           if (partitionField instanceof Number) {
             try {
               long ts = (long) (Double.parseDouble(partitionField.toString()) 
* 1000L);
               partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
             } catch (NumberFormatException nfe) {
-              LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+              LOG.warn("Unable to parse date from partition field. Assuming 
partition as ({})",
+                  partitionField);
             }
           }
           return new HoodieRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
index 963bc7d..0a2ecde 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
@@ -33,8 +33,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.stringtemplate.v4.ST;
 
 import java.io.File;
@@ -49,6 +47,8 @@ import java.sql.Statement;
 import java.util.List;
 import java.util.Scanner;
 import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utility to pull data after a given commit, based on the supplied HiveQL and 
save the delta as another hive temporary
@@ -61,7 +61,7 @@ import java.util.stream.Collectors;
  */
 public class HiveIncrementalPuller {
 
-  private static final Logger LOG = 
LogManager.getLogger(HiveIncrementalPuller.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIncrementalPuller.class);
   private static String driverName = "org.apache.hive.jdbc.HiveDriver";
 
   public static class Config implements Serializable {
@@ -129,10 +129,10 @@ public class HiveIncrementalPuller {
     try {
       if (config.fromCommitTime == null) {
         config.fromCommitTime = inferCommitTime(fs);
-        LOG.info("FromCommitTime inferred as " + config.fromCommitTime);
+        LOG.info("FromCommitTime inferred as {}", config.fromCommitTime);
       }
 
-      LOG.info("FromCommitTime - " + config.fromCommitTime);
+      LOG.info("FromCommitTime - {}", config.fromCommitTime);
       String sourceTableLocation = getTableLocation(config.sourceDb, 
config.sourceTable);
       String lastCommitTime = getLastCommitTimePulled(fs, sourceTableLocation);
       if (lastCommitTime == null) {
@@ -180,15 +180,16 @@ public class HiveIncrementalPuller {
     incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
     String incrementalSQL = new Scanner(new 
File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
     if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
-      LOG.info("Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable
-          + ", which means its pulling from a different table. Fencing this 
from happening.");
+      LOG.info(
+          "Incremental SQL does not have {}.{}, which means its pulling from a 
different table. Fencing this from happening.",
+          config.sourceDb, config.sourceTable);
       throw new HoodieIncrementalPullSQLException(
           "Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable);
     }
     if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) 
{
-      LOG.info("Incremental SQL : " + incrementalSQL
-          + " does not contain `_hoodie_commit_time` > '%targetBasePath'. 
Please add "
-          + "this clause for incremental to work properly.");
+      LOG.info(
+          "Incremental SQL : {} does not contain `_hoodie_commit_time` > 
'%targetBasePath'. Please add "
+              + "this clause for incremental to work properly.", 
incrementalSQL);
       throw new HoodieIncrementalPullSQLException(
           "Incremental SQL does not have clause `_hoodie_commit_time` > 
'%targetBasePath', which "
               + "means its not pulling incrementally");
@@ -224,18 +225,18 @@ public class HiveIncrementalPuller {
   }
 
   private boolean deleteHDFSPath(FileSystem fs, String path) throws 
IOException {
-    LOG.info("Deleting path " + path);
+    LOG.info("Deleting path {}", path);
     return fs.delete(new Path(path), true);
   }
 
   private void executeStatement(String sql, Statement stmt) throws 
SQLException {
-    LOG.info("Executing: " + sql);
+    LOG.info("Executing: {}", sql);
     stmt.execute(sql);
   }
 
   private String inferCommitTime(FileSystem fs) throws SQLException, 
IOException {
-    LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie 
dataset " + config.targetDb + "."
-        + config.targetTable);
+    LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie 
dataset {}.{}",
+        config.targetDb, config.targetTable);
     String targetDataLocation = getTableLocation(config.targetDb, 
config.targetTable);
     return scanForCommitTime(fs, targetDataLocation);
   }
@@ -249,7 +250,7 @@ public class HiveIncrementalPuller {
       resultSet = stmt.executeQuery("describe formatted `" + db + "." + table 
+ "`");
       while (resultSet.next()) {
         if (resultSet.getString(1).trim().equals("Location:")) {
-          LOG.info("Inferred table location for " + db + "." + table + " as " 
+ resultSet.getString(2));
+          LOG.info("Inferred table location for {}.{} as {}", db, table, 
resultSet.getString(2));
           return resultSet.getString(2);
         }
       }
@@ -290,7 +291,7 @@ public class HiveIncrementalPuller {
   private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) 
throws IOException {
     Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable 
+ "__" + config.sourceTable);
     if (!fs.exists(targetBaseDirPath)) {
-      LOG.info("Creating " + targetBaseDirPath + " with permission 
drwxrwxrwx");
+      LOG.info("Creating {} with permission drwxrwxrwx", targetBaseDirPath);
       boolean result =
           FileSystem.mkdirs(fs, targetBaseDirPath, new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
       if (!result) {
@@ -305,7 +306,7 @@ public class HiveIncrementalPuller {
         throw new HoodieException("Could not delete existing " + targetPath);
       }
     }
-    LOG.info("Creating " + targetPath + " with permission drwxrwxrwx");
+    LOG.info("Creating {} with permission drwxrwxrwx", targetPath);
     return FileSystem.mkdirs(fs, targetBaseDirPath, new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
   }
 
@@ -316,19 +317,19 @@ public class HiveIncrementalPuller {
         .collect(Collectors.toList());
     if (commitsToSync.isEmpty()) {
       LOG.warn(
-          "Nothing to sync. All commits in "
-              + config.sourceTable + " are " + 
metadata.getActiveTimeline().getCommitsTimeline()
-                  
.filterCompletedInstants().getInstants().collect(Collectors.toList())
-              + " and from commit time is " + config.fromCommitTime);
+          "Nothing to sync. All commits in {} are {} and from commit time is 
{}",
+          config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline()
+              
.filterCompletedInstants().getInstants().collect(Collectors.toList()),
+          config.fromCommitTime, config.sourceTable);
       return null;
     }
-    LOG.info("Syncing commits " + commitsToSync);
+    LOG.info("Syncing commits {}", commitsToSync);
     return commitsToSync.get(commitsToSync.size() - 1);
   }
 
   private Connection getConnection() throws SQLException {
     if (connection == null) {
-      LOG.info("Getting Hive Connection to " + config.hiveJDBCUrl);
+      LOG.info("Getting Hive Connection to {}", config.hiveJDBCUrl);
       this.connection = DriverManager.getConnection(config.hiveJDBCUrl, 
config.hiveUsername, config.hivePassword);
 
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
index 9185d97..d21e4c6 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
@@ -27,18 +27,18 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HoodieCleaner {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieCleaner.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCleaner.class);
 
   /**
    * Config for Cleaner.
@@ -66,7 +66,7 @@ public class HoodieCleaner {
     this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
     this.props = cfg.propsFilePath == null ? 
UtilHelpers.buildProperties(cfg.configs)
         : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
-    LOG.info("Creating Cleaner with configs : " + props.toString());
+    LOG.info("Creating Cleaner with configs : {}", props.toString());
   }
 
   public void run() throws Exception {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 4ace07c..31a6ab4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -28,18 +28,18 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HoodieCompactor {
 
-  private static final Logger LOG = 
LogManager.getLogger(HoodieCompactor.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCompactor.class);
   private final Config cfg;
   private transient FileSystem fs;
   private TypedProperties props;
@@ -110,7 +110,7 @@ public class HoodieCompactor {
         }
       } while (ret != 0 && retry-- > 0);
     } catch (Throwable t) {
-      LOG.error(t);
+      LOG.error("The compact error:", t);
     }
     return ret;
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index d24319e..2d64029 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -47,6 +45,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Stream;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -54,7 +54,7 @@ import scala.Tuple2;
  */
 public class HoodieSnapshotCopier implements Serializable {
 
-  private static final Logger LOG = 
LogManager.getLogger(HoodieSnapshotCopier.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSnapshotCopier.class);
 
   static class Config implements Serializable {
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 4cb56e9..65b8e8f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -37,8 +37,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.Accumulator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
@@ -53,12 +51,14 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Bunch of helper methods.
  */
 public class UtilHelpers {
-  private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
+  private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class);
 
   public static Source createSource(String sourceClass, TypedProperties cfg, 
JavaSparkContext jssc,
       SparkSession sparkSession, SchemaProvider schemaProvider) throws 
IOException {
@@ -97,7 +97,7 @@ public class UtilHelpers {
       conf = new 
DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
     } catch (Exception e) {
       conf = new DFSPropertiesConfiguration();
-      LOG.warn("Unexpected error read props file at :" + cfgPath, e);
+      LOG.warn("Unexpected error read props file at :{}", cfgPath, e);
     }
 
     try {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java
index 6040437..8a6f75a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java
@@ -28,8 +28,6 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
@@ -38,15 +36,18 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is an one-time use class meant for migrating the configuration for 
"hoodie.compaction.payload.class" in
- * .hoodie/hoodie.properties from com.uber.hoodie to org.apache.hudi . It 
takes in a file containing base-paths for a set
- * of hudi datasets and does the migration
+ * This is an one-time use class meant for migrating the configuration for
+ * "hoodie.compaction.payload.class" in .hoodie/hoodie.properties from 
com.uber.hoodie to
+ * org.apache.hudi . It takes in a file containing base-paths for a set of 
hudi datasets and does
+ * the migration.
  */
 public class UpgradePayloadFromUberToApache implements Serializable {
 
-  private static final Logger LOG = 
LogManager.getLogger(UpgradePayloadFromUberToApache.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(UpgradePayloadFromUberToApache.class);
 
   private final Config cfg;
 
@@ -59,36 +60,37 @@ public class UpgradePayloadFromUberToApache implements 
Serializable {
     try (BufferedReader reader = new BufferedReader(new 
FileReader(cfg.inputPath))) {
       basePath = reader.readLine();
     } catch (IOException e) {
-      LOG.error("Read from path: " + cfg.inputPath + " error.", e);
+      LOG.error("Read from path: {} error.", cfg.inputPath, e);
     }
 
     while (basePath != null) {
       basePath = basePath.trim();
       if (!basePath.startsWith("#")) {
-        LOG.info("Performing upgrade for " + basePath);
+        LOG.info("Performing upgrade for {}", basePath);
         String metaPath = String.format("%s/.hoodie", basePath);
         HoodieTableMetaClient metaClient =
-            new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new 
Configuration()), basePath, false);
+            new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new 
Configuration()), basePath,
+                false);
         HoodieTableConfig tableConfig = metaClient.getTableConfig();
         if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
           Map<String, String> propsMap = tableConfig.getProps();
           if (propsMap.containsKey(HoodieCompactionConfig.PAYLOAD_CLASS_PROP)) 
{
             String payloadClass = 
propsMap.get(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
-            LOG.info("Found payload class=" + payloadClass);
+            LOG.info("Found payload class={}", payloadClass);
             if (payloadClass.startsWith("com.uber.hoodie")) {
               String newPayloadClass = payloadClass.replace("com.uber.hoodie", 
"org.apache.hudi");
-              LOG.info("Replacing payload class (" + payloadClass + ") with (" 
+ newPayloadClass + ")");
+              LOG.info("Replacing payload class ({}) with ({})", payloadClass, 
newPayloadClass);
               Map<String, String> newPropsMap = new HashMap<>(propsMap);
               newPropsMap.put(HoodieCompactionConfig.PAYLOAD_CLASS_PROP, 
newPayloadClass);
               Properties props = new Properties();
               props.putAll(newPropsMap);
-              HoodieTableConfig.createHoodieProperties(metaClient.getFs(), new 
Path(metaPath), props);
-              LOG.info("Finished upgrade for " + basePath);
+              HoodieTableConfig
+                  .createHoodieProperties(metaClient.getFs(), new 
Path(metaPath), props);
+              LOG.info("Finished upgrade for {}", basePath);
             }
           }
         } else {
-          LOG.info("Skipping as this table is COW table. BasePath=" + 
basePath);
-
+          LOG.info("Skipping as this table is COW table. BasePath={}", 
basePath);
         }
       }
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
index 5d36e8d..8fd62e7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
@@ -20,9 +20,6 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -30,13 +27,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base Class for running delta-sync/compaction in separate thread and 
controlling their life-cyle.
  */
 public abstract class AbstractDeltaStreamerService implements Serializable {
 
-  private static final Logger LOG = 
LogManager.getLogger(AbstractDeltaStreamerService.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDeltaStreamerService.class);
 
   // Flag to track if the service is started.
   private boolean started;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
index eb3212f..65bf598 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
@@ -24,20 +24,20 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.Serializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Run one round of compaction.
  */
 public class Compactor implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(Compactor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
 
   private transient HoodieWriteClient compactionClient;
   private transient JavaSparkContext jssc;
@@ -48,12 +48,13 @@ public class Compactor implements Serializable {
   }
 
   public void compact(HoodieInstant instant) throws IOException {
-    LOG.info("Compactor executing compaction " + instant);
+    LOG.info("Compactor executing compaction {}", instant);
     JavaRDD<WriteStatus> res = 
compactionClient.compact(instant.getTimestamp());
     long numWriteErrors = res.collect().stream().filter(r -> 
r.hasErrors()).count();
     if (numWriteErrors != 0) {
       // We treat even a single error in compaction as fatal
-      LOG.error("Compaction for instant (" + instant + ") failed with write 
errors. Errors :" + numWriteErrors);
+      LOG.error("Compaction for instant ({}) failed with write errors. Errors 
:{}", instant,
+          numWriteErrors);
       throw new HoodieException(
           "Compaction for instant (" + instant + ") failed with write errors. 
Errors :" + numWriteErrors);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7dfb015..20608b7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -73,6 +71,8 @@ import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
 
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -83,7 +83,7 @@ import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
  */
 public class DeltaSync implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaSync.class);
   public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
   public static String CHECKPOINT_RESET_KEY = 
"deltastreamer.checkpoint.reset_key";
 
@@ -168,7 +168,7 @@ public class DeltaSync implements Serializable {
     this.tableType = tableType;
     this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
     this.props = props;
-    LOG.info("Creating delta streamer with configs : " + props.toString());
+    LOG.info("Creating delta streamer with configs : {}", props.toString());
     this.schemaProvider = schemaProvider;
 
     refreshTimeline();
@@ -266,7 +266,7 @@ public class DeltaSync implements Serializable {
     if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
       resumeCheckpointStr = Option.of(cfg.checkpoint);
     }
-    LOG.info("Checkpoint to resume from : " + resumeCheckpointStr);
+    LOG.info("Checkpoint to resume from : {}", resumeCheckpointStr);
 
     final Option<JavaRDD<GenericRecord>> avroRDDOptional;
     final String checkpointStr;
@@ -300,8 +300,9 @@ public class DeltaSync implements Serializable {
     }
 
     if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
-      LOG.info("No new data, source checkpoint has not changed. Nothing to 
commit. Old checkpoint=("
-          + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+      LOG.info(
+          "No new data, source checkpoint has not changed. Nothing to commit. 
Old checkpoint=({}). New Checkpoint=({})",
+          resumeCheckpointStr, checkpointStr);
       return null;
     }
 
@@ -342,7 +343,7 @@ public class DeltaSync implements Serializable {
     boolean isEmpty = records.isEmpty();
 
     String commitTime = startCommit();
-    LOG.info("Starting commit  : " + commitTime);
+    LOG.info("Starting commit  : {}", commitTime);
 
     JavaRDD<WriteStatus> writeStatusRDD;
     if (cfg.operation == Operation.INSERT) {
@@ -367,13 +368,14 @@ public class DeltaSync implements Serializable {
       }
 
       if (hasErrors) {
-        LOG.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
-            + totalErrorRecords + "/" + totalRecords);
+        LOG.warn(
+            "Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total={}/{}",
+            totalErrorRecords, totalRecords);
       }
 
       boolean success = writeClient.commit(commitTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata));
       if (success) {
-        LOG.info("Commit " + commitTime + " successful!");
+        LOG.info("Commit {} successful!", commitTime);
 
         // Schedule compaction if needed
         if (cfg.isAsyncCompactionEnabled()) {
@@ -387,16 +389,18 @@ public class DeltaSync implements Serializable {
           hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 
0;
         }
       } else {
-        LOG.info("Commit " + commitTime + " failed!");
+        LOG.info("Commit {} failed!", commitTime);
         throw new HoodieException("Commit " + commitTime + " failed!");
       }
     } else {
-      LOG.error("Delta Sync found errors when writing. Errors/Total=" + 
totalErrorRecords + "/" + totalRecords);
+      LOG.error("Delta Sync found errors when writing. Errors/Total={}/{}", 
totalErrorRecords,
+          totalRecords);
       LOG.error("Printing out the top 100 errors");
       writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
-        LOG.error("Global error :", ws.getGlobalError());
+        LOG.error("Global error :{}", ws.getGlobalError());
         if (ws.getErrors().size() > 0) {
-          ws.getErrors().entrySet().forEach(r -> LOG.trace("Error for key:" + 
r.getKey() + " is " + r.getValue()));
+          ws.getErrors().entrySet()
+              .forEach(r -> LOG.trace("Error for key:{} is {}", r.getKey(), 
r.getValue()));
         }
       });
       // Rolling back instant
@@ -438,8 +442,9 @@ public class DeltaSync implements Serializable {
   private void syncHive() throws ClassNotFoundException {
     if (cfg.enableHiveSync) {
       HiveSyncConfig hiveSyncConfig = 
DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
-      LOG.info("Syncing target hoodie table with hive table(" + 
hiveSyncConfig.tableName + "). Hive metastore URL :"
-          + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
+      LOG.info(
+          "Syncing target hoodie table with hive table({}). Hive metastore URL 
:{}, basePath :{}",
+          hiveSyncConfig.tableName, hiveSyncConfig.jdbcUrl, 
cfg.targetBasePath);
 
       new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
     }
@@ -503,7 +508,7 @@ public class DeltaSync implements Serializable {
         schemas.add(schemaProvider.getTargetSchema());
       }
 
-      LOG.info("Registering Schema :" + schemas);
+      LOG.info("Registering Schema :{}", schemas);
       
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index f8ddadb..3736e87 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -46,8 +46,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 
@@ -66,6 +64,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An Utility which can incrementally take the output from {@link 
HiveIncrementalPuller} and apply it to the target
@@ -78,7 +78,7 @@ import java.util.stream.IntStream;
  */
 public class HoodieDeltaStreamer implements Serializable {
 
-  private static final Logger LOG = 
LogManager.getLogger(HoodieDeltaStreamer.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDeltaStreamer.class);
 
   public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
 
@@ -131,7 +131,7 @@ public class HoodieDeltaStreamer implements Serializable {
   }
 
   private boolean onDeltaSyncShutdown(boolean error) {
-    LOG.info("DeltaSync shutdown. Closing write client. Error?" + error);
+    LOG.info("DeltaSync shutdown. Closing write client. Error?{}", error);
     deltaSyncService.close();
     return true;
   }
@@ -363,7 +363,7 @@ public class HoodieDeltaStreamer implements Serializable {
       }
 
       this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
-      LOG.info("Creating delta streamer with configs : " + props.toString());
+      LOG.info("Creating delta streamer with configs : {}", props.toString());
       this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
 
       if (cfg.filterDupes) {
@@ -385,7 +385,7 @@ public class HoodieDeltaStreamer implements Serializable {
         boolean error = false;
         if (cfg.isAsyncCompactionEnabled()) {
           // set Scheduler Pool.
-          LOG.info("Setting Spark Pool name for delta-sync to " + 
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
+          LOG.info("Setting Spark Pool name for delta-sync to {}", 
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
           jssc.setLocalProperty("spark.scheduler.pool", 
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
         }
         try {
@@ -394,15 +394,15 @@ public class HoodieDeltaStreamer implements Serializable {
               long start = System.currentTimeMillis();
               Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
               if (scheduledCompactionInstant.isPresent()) {
-                LOG.info("Enqueuing new pending compaction instant (" + 
scheduledCompactionInstant + ")");
+                LOG.info("Enqueuing new pending compaction instant ({})", 
scheduledCompactionInstant);
                 asyncCompactService.enqueuePendingCompaction(new 
HoodieInstant(State.REQUESTED,
                     HoodieTimeline.COMPACTION_ACTION, 
scheduledCompactionInstant.get()));
                 
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
               }
               long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
               if (toSleepMs > 0) {
-                LOG.info("Last sync ran less than min sync interval: " + 
cfg.minSyncIntervalSeconds + " s, sleep: "
-                    + toSleepMs + " ms.");
+                LOG.info("Last sync ran less than min sync interval: {} s, 
sleep: {} ms",
+                    cfg.minSyncIntervalSeconds, toSleepMs);
                 Thread.sleep(toSleepMs);
               }
             } catch (Exception e) {
@@ -422,7 +422,7 @@ public class HoodieDeltaStreamer implements Serializable {
      * Shutdown compactor as DeltaSync is shutdown.
      */
     private void shutdownCompactor(boolean error) {
-      LOG.info("Delta Sync shutdown. Error ?" + error);
+      LOG.info("Delta Sync shutdown. Error ?{}", error);
       if (asyncCompactService != null) {
         LOG.warn("Gracefully shutting down compactor");
         asyncCompactService.shutdown(false);
@@ -561,7 +561,8 @@ public class HoodieDeltaStreamer implements Serializable {
           IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> 
CompletableFuture.supplyAsync(() -> {
             try {
               // Set Compactor Pool Name for allowing users to prioritize 
compaction
-              LOG.info("Setting Spark Pool name for compaction to " + 
SchedulerConfGenerator.COMPACT_POOL_NAME);
+              LOG.info("Setting Spark Pool name for compaction to {}",
+                  SchedulerConfGenerator.COMPACT_POOL_NAME);
               jssc.setLocalProperty("spark.scheduler.pool", 
SchedulerConfGenerator.COMPACT_POOL_NAME);
 
               while (!isShutdownRequested()) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
index 09c4da0..862d59d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
@@ -21,8 +21,6 @@ package org.apache.hudi.utilities.deltastreamer;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 
 import java.io.BufferedWriter;
@@ -32,6 +30,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utility Class to generate Spark Scheduling allocation file. This kicks in 
only when user sets
@@ -39,7 +39,7 @@ import java.util.UUID;
  */
 public class SchedulerConfGenerator {
 
-  private static final Logger LOG = 
LogManager.getLogger(SchedulerConfGenerator.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerConfGenerator.class);
 
   public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
   public static final String COMPACT_POOL_NAME = "hoodiecompact";
@@ -88,7 +88,7 @@ public class SchedulerConfGenerator {
     BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
     bw.write(generateConfig(deltaSyncWeight, compactionWeight, 
deltaSyncMinShare, compactionMinShare));
     bw.close();
-    LOG.info("Configs written to file" + tempConfigFile.getAbsolutePath());
+    LOG.info("Configs written to file {}", tempConfigFile.getAbsolutePath());
     return tempConfigFile.getAbsolutePath();
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 1108f65..f8b6a6a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -37,8 +37,6 @@ import com.codahale.metrics.UniformReservoir;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -55,10 +53,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TimelineServerPerf implements Serializable {
 
-  private static final Logger LOG = 
LogManager.getLogger(TimelineServerPerf.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TimelineServerPerf.class);
   private final Config cfg;
   private transient TimelineService timelineServer;
   private final boolean useExternalTimelineServer;
@@ -73,7 +73,7 @@ public class TimelineServerPerf implements Serializable {
   private void setHostAddrFromSparkConf(SparkConf sparkConf) {
     String hostAddr = sparkConf.get("spark.driver.host", null);
     if (hostAddr != null) {
-      LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. 
It was " + this.hostAddr);
+      LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}", 
hostAddr, this.hostAddr);
       this.hostAddr = hostAddr;
     } else {
       LOG.warn("Unable to find driver bind address from spark config");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 18ebff4..9a59333 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -27,20 +27,20 @@ import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
 import io.confluent.kafka.serializers.KafkaAvroDecoder;
 import kafka.serializer.StringDecoder;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.streaming.kafka.KafkaUtils;
 import org.apache.spark.streaming.kafka.OffsetRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Reads avro serialized Kafka data, based on the confluent schema-registry.
  */
 public class AvroKafkaSource extends AvroSource {
 
-  private static final Logger LOG = 
LogManager.getLogger(AvroKafkaSource.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvroKafkaSource.class);
 
   private final KafkaOffsetGen offsetGen;
 
@@ -57,7 +57,7 @@ public class AvroKafkaSource extends AvroSource {
     if (totalNewMsgs <= 0) {
       return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? 
lastCheckpointStr.get() : "");
     } else {
-      LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
+      LOG.info("About to read {} from Kafka for topic :{}", totalNewMsgs, 
offsetGen.getTopicName());
     }
     JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
     return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
index 666c260..5e546f3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
@@ -33,8 +33,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -46,6 +44,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Source to read deltas produced by {@link HiveIncrementalPuller}, commit by 
commit and apply to the target table
@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
  */
 public class HiveIncrPullSource extends AvroSource {
 
-  private static final Logger LOG = 
LogManager.getLogger(HiveIncrPullSource.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIncrPullSource.class);
 
   private final transient FileSystem fs;
 
@@ -95,7 +95,7 @@ public class HiveIncrPullSource extends AvroSource {
       commitTimes.add(splits[splits.length - 1]);
     }
     Collections.sort(commitTimes);
-    LOG.info("Retrieved commit times " + commitTimes);
+    LOG.info("Retrieved commit times {}", commitTimes);
 
     if (!latestTargetCommit.isPresent()) {
       // start from the beginning
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 888eec7..532104d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -28,8 +28,6 @@ import 
org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
@@ -37,10 +35,12 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
 import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HoodieIncrSource extends RowSource {
 
-  private static final Logger LOG = 
LogManager.getLogger(HoodieIncrSource.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieIncrSource.class);
 
   protected static class Config {
 
@@ -109,7 +109,7 @@ public class HoodieIncrSource extends RowSource {
         numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
 
     if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
-      LOG.warn("Already caught up. Begin Checkpoint was :" + 
instantEndpts.getKey());
+      LOG.warn("Already caught up. Begin Checkpoint was :{}", 
instantEndpts.getKey());
       return Pair.of(Option.empty(), instantEndpts.getKey());
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index bd922ac..a146264 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -25,20 +25,20 @@ import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
 
 import kafka.serializer.StringDecoder;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.streaming.kafka.KafkaUtils;
 import org.apache.spark.streaming.kafka.OffsetRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Read json kafka data.
  */
 public class JsonKafkaSource extends JsonSource {
 
-  private static final Logger LOG = 
LogManager.getLogger(JsonKafkaSource.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(JsonKafkaSource.class);
 
   private final KafkaOffsetGen offsetGen;
 
@@ -55,7 +55,7 @@ public class JsonKafkaSource extends JsonSource {
     if (totalNewMsgs <= 0) {
       return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? 
lastCheckpointStr.get() : "");
     }
-    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
+    LOG.info("About to read {} from Kafka for topic :{}", totalNewMsgs, 
offsetGen.getTopicName());
     JavaRDD<String> newDataRDD = toRDD(offsetRanges);
     return new InputBatch<>(Option.of(newDataRDD), 
CheckpointUtils.offsetsToStr(offsetRanges));
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 0760c73..d9d3299 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -22,18 +22,18 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypedProperties;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 
 import java.io.Serializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Represents a source from which we can tail data. Assumes a constructor that 
takes properties.
  */
 public abstract class Source<T> implements Serializable {
-  private static final Logger LOG = LogManager.getLogger(Source.class);
+  private static final Logger LOG = LoggerFactory.getLogger(Source.class);
 
   public enum SourceType {
     JSON, AVRO, ROW
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index c17a5cf..3810e94 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -25,8 +25,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
 
 import kafka.common.TopicAndPartition;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.streaming.kafka.KafkaCluster;
 import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
 import org.apache.spark.streaming.kafka.OffsetRange;
@@ -50,9 +48,6 @@ import scala.util.Either;
  * Source to read data from Kafka, incrementally.
  */
 public class KafkaOffsetGen {
-
-  private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
-
   public static class CheckpointUtils {
 
     /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
index aabcb73..9f530e2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
@@ -20,8 +20,6 @@ package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.util.TypedProperties;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -30,6 +28,8 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Transformer that can flatten nested objects. It currently doesn't unnest 
arrays.
@@ -37,7 +37,7 @@ import java.util.UUID;
 public class FlatteningTransformer implements Transformer {
 
   private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_";
-  private static final Logger LOG = 
LogManager.getLogger(SqlQueryBasedTransformer.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(SqlQueryBasedTransformer.class);
 
   /**
    * Configs supported.
@@ -48,7 +48,7 @@ public class FlatteningTransformer implements Transformer {
 
     // tmp table name doesn't like dashes
     String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
-    LOG.info("Registering tmp table : " + tmpTable);
+    LOG.info("Registering tmp table : {}", tmpTable);
     rowDataset.registerTempTable(tmpTable);
     return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), 
null) + " from " + tmpTable);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
index 8210fb1..1db5fd0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
@@ -20,14 +20,14 @@ package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.util.TypedProperties;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
 import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A transformer that allows a sql-query template be used to transform the 
source before writing to Hudi data-set.
@@ -36,7 +36,7 @@ import java.util.UUID;
  */
 public class SqlQueryBasedTransformer implements Transformer {
 
-  private static final Logger LOG = 
LogManager.getLogger(SqlQueryBasedTransformer.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(SqlQueryBasedTransformer.class);
 
   private static final String SRC_PATTERN = "<SRC>";
   private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
@@ -59,10 +59,10 @@ public class SqlQueryBasedTransformer implements 
Transformer {
 
     // tmp table name doesn't like dashes
     String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
-    LOG.info("Registering tmp table : " + tmpTable);
+    LOG.info("Registering tmp table : {}", tmpTable);
     rowDataset.registerTempTable(tmpTable);
     String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
-    LOG.info("SQL Query for transformation : (" + sqlStr + ")");
+    LOG.info("SQL Query for transformation : ({})", sqlStr);
     return sparkSession.sql(sqlStr);
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index c5f6c76..a42b5d6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -50,8 +50,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -77,6 +75,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -89,7 +89,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
   private static final String PROPS_FILENAME_TEST_SOURCE = 
"test-source.properties";
   private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
-  private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieDeltaStreamer.class);
 
   @BeforeClass
   public static void initClass() throws Exception {
@@ -247,7 +247,8 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     static void assertAtleastNCompactionCommits(int minExpected, String 
datasetPath, FileSystem fs) {
       HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
datasetPath);
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
-      LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      LOG.info("Timeline Instants={}",
+          meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
       int numCompactionCommits = (int) timeline.getInstants().count();
       assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, 
minExpected <= numCompactionCommits);
     }
@@ -255,7 +256,8 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     static void assertAtleastNDeltaCommits(int minExpected, String 
datasetPath, FileSystem fs) {
       HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
datasetPath);
       HoodieTimeline timeline = 
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
-      LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      LOG.info("Timeline Instants={}",
+          meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
       int numDeltaCommits = (int) timeline.getInstants().count();
       assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, 
minExpected <= numDeltaCommits);
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
index 745b0f0..941ddeb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
@@ -29,8 +29,6 @@ import 
org.apache.hudi.utilities.sources.config.TestSourceConfig;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 
@@ -39,10 +37,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractBaseTestSource extends AvroSource {
 
-  private static final Logger LOG = 
LogManager.getLogger(AbstractBaseTestSource.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractBaseTestSource.class);
 
   static final int DEFAULT_PARTITION_NUM = 0;
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java
index 7153b2e..3c43aa3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java
@@ -24,21 +24,21 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.config.TestSourceConfig;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A Test DataSource which scales test-data generation by using spark 
parallelism.
  */
 public class DistributedTestDataSource extends AbstractBaseTestSource {
 
-  private static final Logger LOG = 
LogManager.getLogger(DistributedTestDataSource.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(DistributedTestDataSource.class);
 
   private final int numTestSourcePartitions;
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
index 0b52db9..ae09474 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
@@ -23,21 +23,21 @@ import org.apache.hudi.common.util.TypedProperties;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 
 import java.util.List;
 import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of {@link Source}, that emits test upserts.
  */
 public class TestDataSource extends AbstractBaseTestSource {
 
-  private static final Logger LOG = LogManager.getLogger(TestDataSource.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestDataSource.class);
 
   public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
       SchemaProvider schemaProvider) {

Reply via email to