This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/redo-log by this push:
     new 36cb764  [HUDI-461] Redo hudi-spark log statements using SLF4J (#1180)
36cb764 is described below

commit 36cb764ec4f31a1dfa26736d76fd103a1b611946
Author: ForwardXu <[email protected]>
AuthorDate: Mon Jan 6 22:11:00 2020 +0800

    [HUDI-461] Redo hudi-spark log statements using SLF4J (#1180)
---
 hudi-spark/pom.xml                                 |  5 +++++
 .../main/scala/org/apache/hudi/DefaultSource.scala |  6 +++---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 22 +++++++++++-----------
 .../org/apache/hudi/HoodieStreamingSink.scala      |  5 ++---
 .../org/apache/hudi/IncrementalRelation.scala      |  6 +++---
 hudi-spark/src/test/java/HoodieJavaApp.java        | 16 ++++++++--------
 .../src/test/java/HoodieJavaStreamingApp.java      | 14 +++++++-------
 7 files changed, 39 insertions(+), 35 deletions(-)

diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml
index b490dd7..51fa305 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark/pom.xml
@@ -178,6 +178,11 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
 
     <!-- Fasterxml -->
     <dependency>
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0c857d0..68784e6 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -20,7 +20,7 @@ package org.apache.hudi
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.sources._
@@ -39,7 +39,7 @@ class DefaultSource extends RelationProvider
   with StreamSinkProvider
   with Serializable {
 
-  private val log = LogManager.getLogger(classOf[DefaultSource])
+  private val log: Logger = LoggerFactory.getLogger(classOf[DefaultSource])
 
   override def createRelation(sqlContext: SQLContext,
                               parameters: Map[String, String]): BaseRelation = 
{
@@ -71,7 +71,7 @@ class DefaultSource extends RelationProvider
         classOf[HoodieROTablePathFilter],
         classOf[org.apache.hadoop.fs.PathFilter]);
 
-      log.info("Constructing hoodie (as parquet) data source with options :" + 
parameters)
+      log.info("Constructing hoodie (as parquet) data source with options 
:{}", parameters)
       // simply return as a regular parquet relation
       DataSource.apply(
         sparkSession = sqlContext.sparkSession,
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index af19e28..515973d 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.common.util.{FSUtils, TypedProperties}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -40,7 +40,7 @@ import scala.collection.mutable.ListBuffer
 
 private[hudi] object HoodieSparkSqlWriter {
 
-  private val log = LogManager.getLogger(getClass)
+  private val log: Logger = LoggerFactory.getLogger(getClass)
 
   def write(sqlContext: SQLContext,
             mode: SaveMode,
@@ -171,7 +171,7 @@ private[hudi] object HoodieSparkSqlWriter {
 
           val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
           val syncHiveSucess = if (hiveSyncEnabled) {
-            log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+            log.info(s"Syncing to Hive Metastore (URL: 
${parameters(HIVE_URL_OPT_KEY)} )")
             val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
             syncHive(basePath, fs, parameters)
           } else {
@@ -240,15 +240,15 @@ private[hudi] object HoodieSparkSqlWriter {
           }
 
           if (commitSuccess) {
-            log.info("Commit " + commitTime + " successful!")
+            log.info(s"Commit ${commitTime} successful!")
           }
           else {
-            log.info("Commit " + commitTime + " failed!")
+            log.info(s"Commit ${commitTime}  failed!")
           }
 
           val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
           val syncHiveSucess = if (hiveSyncEnabled) {
-            log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+            log.info(s"Syncing to Hive Metastore (URL: 
${parameters(HIVE_URL_OPT_KEY)} )")
             val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
             syncHive(basePath, fs, parameters)
           } else {
@@ -278,11 +278,11 @@ private[hudi] object HoodieSparkSqlWriter {
   }
 
   /**
-    * Add default options for unspecified write options keys.
-    *
-    * @param parameters
-    * @return
-    */
+   * Add default options for unspecified write options keys.
+   *
+   * @param parameters
+   * @return
+   */
   def parametersWithWriteDefaults(parameters: Map[String, String]): 
Map[String, String] = {
     Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
       STORAGE_TYPE_OPT_KEY -> DEFAULT_STORAGE_TYPE_OPT_VAL,
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 9f18a2e..6b7aca0 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -17,7 +17,7 @@
 package org.apache.hudi
 
 import org.apache.hudi.exception.HoodieCorruptedDataException
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -30,9 +30,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
                           outputMode: OutputMode)
   extends Sink
     with Serializable {
-  @volatile private var latestBatchId = -1L
 
-  private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
+  private val log: Logger = 
LoggerFactory.getLogger(classOf[HoodieStreamingSink])
 
   private val retryCnt = 
options(DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY).toInt
   private val retryIntervalMs = 
options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 91b8909..e1b1960 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ParquetUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.table.HoodieTable
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.types.StructType
@@ -44,7 +44,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
                           val optParams: Map[String, String],
                           val userSchema: StructType) extends BaseRelation 
with TableScan {
 
-  private val log = LogManager.getLogger(classOf[IncrementalRelation])
+  private val log: Logger = 
LoggerFactory.getLogger(classOf[IncrementalRelation])
 
   val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
   val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, 
true)
@@ -107,7 +107,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
     if (fileIdToFullPath.isEmpty) {
       sqlContext.sparkContext.emptyRDD[Row]
     } else {
-      log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
+      log.info(s"Additional Filters to be applied to incremental source are : 
$filters")
       filters.foldLeft(sqlContext.read.options(sOpts)
         .schema(latestSchema)
         .parquet(fileIdToFullPath.values.toList: _*)
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java 
b/hudi-spark/src/test/java/HoodieJavaApp.java
index e33ff2b..1373fc6 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -32,8 +32,8 @@ import org.apache.hudi.hive.NonPartitionedExtractor;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
@@ -85,7 +85,7 @@ public class HoodieJavaApp {
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
-  private static final Logger LOG = LogManager.getLogger(HoodieJavaApp.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieJavaApp.class);
 
   public static void main(String[] args) throws Exception {
     HoodieJavaApp cli = new HoodieJavaApp();
@@ -153,7 +153,7 @@ public class HoodieJavaApp {
     // new dataset if needed
     writer.save(tablePath); // ultimately where the dataset will be placed
     String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, 
tablePath);
-    LOG.info("First commit at instant time :" + commitInstantTime1);
+    LOG.info("First commit at instant time :{}", commitInstantTime1);
 
     /**
      * Commit that updates records
@@ -176,7 +176,7 @@ public class HoodieJavaApp {
     updateHiveSyncConfig(writer);
     writer.save(tablePath);
     String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, 
tablePath);
-    LOG.info("Second commit at instant time :" + commitInstantTime2);
+    LOG.info("Second commit at instant time :{}", commitInstantTime2);
 
     /**
      * Commit that Deletes some records
@@ -200,7 +200,7 @@ public class HoodieJavaApp {
     updateHiveSyncConfig(writer);
     writer.save(tablePath);
     String commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, 
tablePath);
-    LOG.info("Third commit at instant time :" + commitInstantTime3);
+    LOG.info("Third commit at instant time :{}", commitInstantTime3);
 
     /**
      * Read & do some queries
@@ -225,7 +225,7 @@ public class HoodieJavaApp {
           // For incremental view, pass in the root/base path of dataset
           .load(tablePath);
 
-      LOG.info("You will only see records from : " + commitInstantTime2);
+      LOG.info("You will only see records from : {}", commitInstantTime2);
       
hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
     }
   }
@@ -235,7 +235,7 @@ public class HoodieJavaApp {
    */
   private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> 
writer) {
     if (enableHiveSync) {
-      LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+      LOG.info("Enabling Hive sync to {}", hiveJdbcUrl);
       writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), 
hiveTable)
           .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
           .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)
diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java 
b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 1529c11..fe8e3d1 100644
--- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -28,8 +28,8 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -94,7 +94,7 @@ public class HoodieJavaStreamingApp {
   public Boolean help = false;
 
 
-  private static final Logger LOG = 
LogManager.getLogger(HoodieJavaStreamingApp.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieJavaStreamingApp.class);
 
   public static void main(String[] args) throws Exception {
     HoodieJavaStreamingApp cli = new HoodieJavaStreamingApp();
@@ -178,13 +178,13 @@ public class HoodieJavaStreamingApp {
     // wait for spark streaming to process one microbatch
     Thread.sleep(3000);
     String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, 
tablePath);
-    LOG.info("First commit at instant time :" + commitInstantTime1);
+    LOG.info("First commit at instant time :{}", commitInstantTime1);
 
     inputDF2.write().mode(SaveMode.Append).json(streamingSourcePath);
     // wait for spark streaming to process one microbatch
     Thread.sleep(3000);
     String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, 
tablePath);
-    LOG.info("Second commit at instant time :" + commitInstantTime2);
+    LOG.info("Second commit at instant time :{}", commitInstantTime2);
 
     /**
      * Read & do some queries
@@ -209,7 +209,7 @@ public class HoodieJavaStreamingApp {
           // For incremental view, pass in the root/base path of dataset
           .load(tablePath);
 
-      LOG.info("You will only see records from : " + commitInstantTime2);
+      LOG.info("You will only see records from : {}", commitInstantTime2);
       
hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
     }
   }
@@ -243,7 +243,7 @@ public class HoodieJavaStreamingApp {
    */
   private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row> 
writer) {
     if (enableHiveSync) {
-      LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+      LOG.info("Enabling Hive sync to {}", hiveJdbcUrl);
       writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), 
hiveTable)
           .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
           .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)

Reply via email to