This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/redo-log by this push:
new 36cb764 [HUDI-461] Redo hudi-spark log statements using SLF4J (#1180)
36cb764 is described below
commit 36cb764ec4f31a1dfa26736d76fd103a1b611946
Author: ForwardXu <[email protected]>
AuthorDate: Mon Jan 6 22:11:00 2020 +0800
[HUDI-461] Redo hudi-spark log statements using SLF4J (#1180)
---
hudi-spark/pom.xml | 5 +++++
.../main/scala/org/apache/hudi/DefaultSource.scala | 6 +++---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 22 +++++++++++-----------
.../org/apache/hudi/HoodieStreamingSink.scala | 5 ++---
.../org/apache/hudi/IncrementalRelation.scala | 6 +++---
hudi-spark/src/test/java/HoodieJavaApp.java | 16 ++++++++--------
.../src/test/java/HoodieJavaStreamingApp.java | 14 +++++++-------
7 files changed, 39 insertions(+), 35 deletions(-)
diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml
index b490dd7..51fa305 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark/pom.xml
@@ -178,6 +178,11 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
<!-- Fasterxml -->
<dependency>
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0c857d0..68784e6 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -20,7 +20,7 @@ package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
@@ -39,7 +39,7 @@ class DefaultSource extends RelationProvider
with StreamSinkProvider
with Serializable {
- private val log = LogManager.getLogger(classOf[DefaultSource])
+ private val log: Logger = LoggerFactory.getLogger(classOf[DefaultSource])
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation =
{
@@ -71,7 +71,7 @@ class DefaultSource extends RelationProvider
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter]);
- log.info("Constructing hoodie (as parquet) data source with options :" +
parameters)
+ log.info("Constructing hoodie (as parquet) data source with options
:{}", parameters)
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index af19e28..515973d 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.common.util.{FSUtils, TypedProperties}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -40,7 +40,7 @@ import scala.collection.mutable.ListBuffer
private[hudi] object HoodieSparkSqlWriter {
- private val log = LogManager.getLogger(getClass)
+ private val log: Logger = LoggerFactory.getLogger(getClass)
def write(sqlContext: SQLContext,
mode: SaveMode,
@@ -171,7 +171,7 @@ private[hudi] object HoodieSparkSqlWriter {
val hiveSyncEnabled =
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
- log.info("Syncing to Hive Metastore (URL: " +
parameters(HIVE_URL_OPT_KEY) + ")")
+ log.info(s"Syncing to Hive Metastore (URL:
${parameters(HIVE_URL_OPT_KEY)} )")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
@@ -240,15 +240,15 @@ private[hudi] object HoodieSparkSqlWriter {
}
if (commitSuccess) {
- log.info("Commit " + commitTime + " successful!")
+ log.info(s"Commit ${commitTime} successful!")
}
else {
- log.info("Commit " + commitTime + " failed!")
+ log.info(s"Commit ${commitTime} failed!")
}
val hiveSyncEnabled =
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
- log.info("Syncing to Hive Metastore (URL: " +
parameters(HIVE_URL_OPT_KEY) + ")")
+ log.info(s"Syncing to Hive Metastore (URL:
${parameters(HIVE_URL_OPT_KEY)} )")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
@@ -278,11 +278,11 @@ private[hudi] object HoodieSparkSqlWriter {
}
/**
- * Add default options for unspecified write options keys.
- *
- * @param parameters
- * @return
- */
+ * Add default options for unspecified write options keys.
+ *
+ * @param parameters
+ * @return
+ */
def parametersWithWriteDefaults(parameters: Map[String, String]):
Map[String, String] = {
Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
STORAGE_TYPE_OPT_KEY -> DEFAULT_STORAGE_TYPE_OPT_VAL,
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 9f18a2e..6b7aca0 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -17,7 +17,7 @@
package org.apache.hudi
import org.apache.hudi.exception.HoodieCorruptedDataException
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -30,9 +30,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
outputMode: OutputMode)
extends Sink
with Serializable {
- @volatile private var latestBatchId = -1L
- private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
+ private val log: Logger =
LoggerFactory.getLogger(classOf[HoodieStreamingSink])
private val retryCnt =
options(DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY).toInt
private val retryIntervalMs =
options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 91b8909..e1b1960 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.table.HoodieTable
-import org.apache.log4j.LogManager
+import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
@@ -44,7 +44,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType) extends BaseRelation
with TableScan {
- private val log = LogManager.getLogger(classOf[IncrementalRelation])
+ private val log: Logger =
LoggerFactory.getLogger(classOf[IncrementalRelation])
val fs = new
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val metaClient = new
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath,
true)
@@ -107,7 +107,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
if (fileIdToFullPath.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
- log.info("Additional Filters to be applied to incremental source are :"
+ filters)
+ log.info(s"Additional Filters to be applied to incremental source are :
$filters")
filters.foldLeft(sqlContext.read.options(sOpts)
.schema(latestSchema)
.parquet(fileIdToFullPath.values.toList: _*)
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java
b/hudi-spark/src/test/java/HoodieJavaApp.java
index e33ff2b..1373fc6 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -32,8 +32,8 @@ import org.apache.hudi.hive.NonPartitionedExtractor;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
@@ -85,7 +85,7 @@ public class HoodieJavaApp {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
- private static final Logger LOG = LogManager.getLogger(HoodieJavaApp.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieJavaApp.class);
public static void main(String[] args) throws Exception {
HoodieJavaApp cli = new HoodieJavaApp();
@@ -153,7 +153,7 @@ public class HoodieJavaApp {
// new dataset if needed
writer.save(tablePath); // ultimately where the dataset will be placed
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("First commit at instant time :" + commitInstantTime1);
+ LOG.info("First commit at instant time :{}", commitInstantTime1);
/**
* Commit that updates records
@@ -176,7 +176,7 @@ public class HoodieJavaApp {
updateHiveSyncConfig(writer);
writer.save(tablePath);
String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("Second commit at instant time :" + commitInstantTime2);
+ LOG.info("Second commit at instant time :{}", commitInstantTime2);
/**
* Commit that Deletes some records
@@ -200,7 +200,7 @@ public class HoodieJavaApp {
updateHiveSyncConfig(writer);
writer.save(tablePath);
String commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("Third commit at instant time :" + commitInstantTime3);
+ LOG.info("Third commit at instant time :{}", commitInstantTime3);
/**
* Read & do some queries
@@ -225,7 +225,7 @@ public class HoodieJavaApp {
// For incremental view, pass in the root/base path of dataset
.load(tablePath);
- LOG.info("You will only see records from : " + commitInstantTime2);
+ LOG.info("You will only see records from : {}", commitInstantTime2);
hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
}
}
@@ -235,7 +235,7 @@ public class HoodieJavaApp {
*/
private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row>
writer) {
if (enableHiveSync) {
- LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+ LOG.info("Enabling Hive sync to {}", hiveJdbcUrl);
writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(),
hiveTable)
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)
diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 1529c11..fe8e3d1 100644
--- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -28,8 +28,8 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -94,7 +94,7 @@ public class HoodieJavaStreamingApp {
public Boolean help = false;
- private static final Logger LOG =
LogManager.getLogger(HoodieJavaStreamingApp.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieJavaStreamingApp.class);
public static void main(String[] args) throws Exception {
HoodieJavaStreamingApp cli = new HoodieJavaStreamingApp();
@@ -178,13 +178,13 @@ public class HoodieJavaStreamingApp {
// wait for spark streaming to process one microbatch
Thread.sleep(3000);
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("First commit at instant time :" + commitInstantTime1);
+ LOG.info("First commit at instant time :{}", commitInstantTime1);
inputDF2.write().mode(SaveMode.Append).json(streamingSourcePath);
// wait for spark streaming to process one microbatch
Thread.sleep(3000);
String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("Second commit at instant time :" + commitInstantTime2);
+ LOG.info("Second commit at instant time :{}", commitInstantTime2);
/**
* Read & do some queries
@@ -209,7 +209,7 @@ public class HoodieJavaStreamingApp {
// For incremental view, pass in the root/base path of dataset
.load(tablePath);
- LOG.info("You will only see records from : " + commitInstantTime2);
+ LOG.info("You will only see records from : {}", commitInstantTime2);
hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
}
}
@@ -243,7 +243,7 @@ public class HoodieJavaStreamingApp {
*/
private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row>
writer) {
if (enableHiveSync) {
- LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+ LOG.info("Enabling Hive sync to {}", hiveJdbcUrl);
writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(),
hiveTable)
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)