http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index 57b2e44..3f0eb71 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -28,7 +28,9 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.SegmentProperties @@ -335,7 +337,7 @@ object StreamHandoffRDD { } catch { case ex: Exception => loadStatus = SegmentStatus.LOAD_FAILURE - LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId") + LOGGER.error(s"Handoff failed on streaming segment $handoffSegmenId", ex) errorMessage = errorMessage + ": " + ex.getCause.getMessage LOGGER.error(errorMessage) } @@ -345,7 +347,7 @@ object StreamHandoffRDD { LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") - LOGGER.audit(s"Handoff is failed for " + + Audit.log(LOGGER, s"Handoff is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Cannot write load metadata file as handoff failed") throw new Exception(errorMessage) @@ -367,7 +369,7 @@ object StreamHandoffRDD { .fireEvent(loadTablePostStatusUpdateEvent, operationContext) if (!done) { val errorMessage = "Handoff failed due to failure in table status updation." - LOGGER.audit("Handoff is failed for " + + Audit.log(LOGGER, "Handoff is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Handoff failed due to failure in table status updation.") throw new Exception(errorMessage)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 2cc2a5b..1e8d148 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -27,6 +27,7 @@ import scala.collection.mutable import scala.util.Try import com.univocity.parsers.common.TextParsingException +import org.apache.log4j.Logger import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil @@ -363,7 +364,7 @@ object CarbonScalaUtil { /** * Retrieve error message from exception */ - def retrieveAndLogErrorMsg(ex: Throwable, logger: LogService): (String, String) = { + def retrieveAndLogErrorMsg(ex: Throwable, logger: Logger): (String, String) = { var errorMessage = "DataLoad failure" var executorMessage = "" if (ex != null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 67c4c9b..704382f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -696,17 +696,17 @@ object GlobalDictionaryUtil { } catch { case ex: Exception => if (ex.getCause != null && ex.getCause.isInstanceOf[NoRetryException]) { - LOGGER.error(ex.getCause, "generate global dictionary failed") + LOGGER.error("generate global dictionary failed", ex.getCause) throw new Exception("generate global dictionary failed, " + ex.getCause.getMessage) } ex match { case spx: SparkException => - LOGGER.error(spx, "generate global dictionary failed") + LOGGER.error("generate global dictionary failed", spx) throw new Exception("generate global dictionary failed, " + trimErrorMessage(spx.getMessage)) case _ => - LOGGER.error(ex, "generate global dictionary failed") + LOGGER.error("generate global dictionary failed", ex) throw ex } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 5e0fe8b..e1dd0af 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -29,7 +29,9 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.util.CarbonException +import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory @@ -280,7 +282,7 @@ class AlterTableColumnSchemaGenerator( .foreach(f => if (f._2.size > 1) { val name = f._1 LOGGER.error(s"Duplicate column found with name: $name") - LOGGER.audit( + Audit.log(LOGGER, s"Validation failed for Create/Alter Table Operation " + s"for ${ dbName }.${ alterTableModel.tableName }. " + s"Duplicate column found with name: $name") @@ -289,7 +291,7 @@ class AlterTableColumnSchemaGenerator( if (newCols.exists(_.getDataType.isComplexType)) { LOGGER.error(s"Complex column cannot be added") - LOGGER.audit( + Audit.log(LOGGER, s"Validation failed for Create/Alter Table Operation " + s"for ${ dbName }.${ alterTableModel.tableName }. " + s"Complex column cannot be added") @@ -780,7 +782,7 @@ class TableNewProcessor(cm: TableModel) { if (f._2.size > 1) { val name = f._1 LOGGER.error(s"Duplicate column found with name: $name") - LOGGER.audit( + Audit.log(LOGGER, s"Validation failed for Create/Alter Table Operation " + s"Duplicate column found with name: $name") CarbonException.analysisException(s"Duplicate dimensions found with name: $name") http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 6d93b34..2fdbc86 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -283,7 +283,7 @@ object CarbonAppendableStreamSink { case t: Throwable => val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) StreamSegment.recoverSegmentIfRequired(segmentDir) - LOGGER.error(t, s"Aborting job ${ job.getJobID }.") + LOGGER.error(s"Aborting job ${ job.getJobID }.", t) committer.abortJob(job) throw new CarbonStreamException("Job failed to write data file", t) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala index e5552db..87106e0 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala @@ -24,7 +24,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.io.IOUtils +import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.HdfsFileLock import org.apache.carbondata.core.util.CarbonUtil @@ -48,7 +50,7 @@ object ResourceRegisterAndCopier { if (!file.exists()) { sys.error(s"""Provided path $hdfsPath does not exist""") } - LOGGER.audit("Try downloading resource data") + Audit.log(LOGGER, "Try downloading resource data") val lock = new HdfsFileLock(hdfsPath, "/resource.lock") var bool = false try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java index 6acf31f..5121027 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java +++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java @@ -23,7 +23,6 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; -import org.apache.parquet.column.Encoding; import org.apache.spark.sql.CarbonVectorProxy; import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil; import org.apache.spark.sql.types.Decimal; http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 67ea497..779c62f 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.carbondata.common.logging.LogService; +import org.apache.log4j.Logger; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.datastore.block.TableBlockInfo; @@ -65,7 +65,7 @@ import org.apache.spark.sql.types.StructType; */ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { - private static final LogService LOGGER = + private static final Logger LOGGER = LogServiceFactory.getLogService(VectorizedCarbonRecordReader.class.getName()); private int batchIdx = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index e3fec10..3f486d0 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -403,7 +403,7 @@ class IndexDataMapRebuildRDD[K, V]( reader.close() } catch { case ex: Throwable => - LOGGER.error(ex, "Failed to close reader") + LOGGER.error("Failed to close reader", ex) } } @@ -412,7 +412,7 @@ class IndexDataMapRebuildRDD[K, V]( refresher.close() } catch { case ex: Throwable => - LOGGER.error(ex, "Failed to close index writer") + LOGGER.error("Failed to close index writer", ex) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala index 82bae8e..a0bdd64 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command.CompactionModel import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil +import org.apache.carbondata.api.CarbonStore.LOGGER +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory @@ -130,8 +132,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, carbonLoadModel.getTableName) LOGGER .info(s"Compaction request for datamap ${ carbonTable.getTableUniqueName } is successful") - LOGGER - .audit(s"Compaction request for datamap ${carbonTable.getTableUniqueName} is successful") + Audit.log(LOGGER, + s"Compaction request for datamap ${carbonTable.getTableUniqueName} is successful") } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 0ec3bc6..4f42139 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -45,8 +45,10 @@ import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} +import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.status.DataMapStatusManager @@ -130,7 +132,7 @@ object CarbonDataRDDFactory { } } } else { - LOGGER.audit("Not able to acquire the system level compaction lock for table " + + Audit.log(LOGGER, "Not able to acquire the system level compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") @@ -307,7 +309,7 @@ object CarbonDataRDDFactory { dataFrame: Option[DataFrame] = None, updateModel: Option[UpdateTableModel] = None, operationContext: OperationContext): Unit = { - LOGGER.audit(s"Data load request has been received for table" + + Audit.log(LOGGER, s"Data load request has been received for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") // Check if any load need to be deleted before loading new data val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable @@ -449,7 +451,7 @@ object CarbonDataRDDFactory { // this means that the update doesnt have any records to update so no need to do table // status file updation. if (resultSize == 0) { - LOGGER.audit("Data update is successful with 0 rows updation for " + + Audit.log(LOGGER, "Data update is successful with 0 rows updation for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") return } @@ -460,11 +462,11 @@ object CarbonDataRDDFactory { true, new util.ArrayList[Segment](0), new util.ArrayList[Segment](segmentFiles), "")) { - LOGGER.audit("Data update is successful for " + + Audit.log(LOGGER, "Data update is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } else { val errorMessage = "Data update failed due to failure in table status updation." - LOGGER.audit("Data update is failed for " + + Audit.log(LOGGER, "Data update is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Data update failed due to failure in table status updation.") updateModel.get.executorErrors.errorMsg = errorMessage @@ -486,7 +488,7 @@ object CarbonDataRDDFactory { clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") - LOGGER.audit(s"Data load is failed for " + + Audit.log(LOGGER, s"Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.warn("Cannot write load metadata file as data load failed") throw new Exception(errorMessage) @@ -505,7 +507,7 @@ object CarbonDataRDDFactory { clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") - LOGGER.audit(s"Data load is failed for " + + Audit.log(LOGGER, s"Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") throw new Exception(status(0)._2._2.errorMsg) } @@ -557,7 +559,7 @@ object CarbonDataRDDFactory { true } catch { case ex: Exception => - LOGGER.error(ex, "Problem while committing data maps") + LOGGER.error("Problem while committing data maps", ex) false } if (!done || !commitComplete) { @@ -573,16 +575,16 @@ object CarbonDataRDDFactory { clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") - LOGGER.audit("Data load is failed for " + + Audit.log(LOGGER, "Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error("Data load failed due to failure in table status updation.") throw new Exception("Data load failed due to failure in table status updation.") } if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) { - LOGGER.audit("Data load is partially successful for " + + Audit.log(LOGGER, "Data load is partially successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } else { - LOGGER.audit("Data load is successful for " + + Audit.log(LOGGER, "Data load is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } try { @@ -843,7 +845,7 @@ object CarbonDataRDDFactory { s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }") if (!carbonTable.isChildDataMap && CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { - LOGGER.audit(s"Compaction request received for table " + + Audit.log(LOGGER, s"Compaction request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val compactionSize = 0 val isCompactionTriggerByDDl = false @@ -903,7 +905,7 @@ object CarbonDataRDDFactory { throw e } } else { - LOGGER.audit("Not able to acquire the compaction lock for table " + + Audit.log(LOGGER, "Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}") LOGGER.error("Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}") @@ -946,7 +948,7 @@ object CarbonDataRDDFactory { if (!done) { val errorMessage = s"Dataload failed due to failure in table status updation for" + s" ${carbonLoadModel.getTableName}" - LOGGER.audit("Data load is failed for " + + Audit.log(LOGGER, "Data load is failed for " + s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") LOGGER.error("Dataload failed due to failure in table status updation.") throw new Exception(errorMessage) @@ -1087,7 +1089,7 @@ object CarbonDataRDDFactory { ).collect() } catch { case ex: Exception => - LOGGER.error(ex, "load data failed for partition table") + LOGGER.error("load data failed for partition table", ex) throw ex } } @@ -1120,7 +1122,7 @@ object CarbonDataRDDFactory { ).collect() } catch { case ex: Exception => - LOGGER.error(ex, "load data frame failed") + LOGGER.error("load data frame failed", ex) throw ex } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index c505bbc..756d30c 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -27,6 +27,8 @@ import scala.collection.mutable import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} +import org.apache.carbondata.api.CarbonStore.LOGGER +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.metadata.SegmentFileStore @@ -68,7 +70,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments) } catch { case e: Exception => - LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }") + LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e) throw e } @@ -302,7 +304,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, // true because compaction for all datamaps will be finished at a time to the maximum level // possible (level 1, 2 etc). so we need to check for either condition if (!statusFileUpdation || !commitComplete) { - LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + + Audit.log(LOGGER, + s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + s"${ carbonLoadModel.getTableName }") LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + s"${ carbonLoadModel.getTableName }") @@ -310,13 +313,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, s" ${ carbonLoadModel.getDatabaseName }." + s"${ carbonLoadModel.getTableName }") } else { - LOGGER.audit(s"Compaction request completed for table " + + Audit.log(LOGGER, + s"Compaction request completed for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.info(s"Compaction request completed for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } } else { - LOGGER.audit(s"Compaction request failed for table " + + Audit.log(LOGGER, s"Compaction request failed for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" ) LOGGER.error(s"Compaction request failed for table " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index 320cd78..7edc50f 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -37,7 +37,7 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel, sqlContext: SQLContext, storeLocation: String) { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) def executeCompaction(): Unit http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala index 23323d4..1b9fb44 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala @@ -26,9 +26,11 @@ import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSour import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.NoSuchStreamException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat @@ -158,7 +160,7 @@ object StreamJobManager { StreamJobDesc(job, streamName, sourceTable.getDatabaseName, sourceTable.getTableName, sinkTable.getDatabaseName, sinkTable.getTableName, query, thread)) - LOGGER.audit(s"STREAM $streamName started with job id '${job.id.toString}', " + + Audit.log(LOGGER, s"STREAM $streamName started with job id '${job.id.toString}', " + s"from ${sourceTable.getDatabaseName}.${sourceTable.getTableName} " + s"to ${sinkTable.getDatabaseName}.${sinkTable.getTableName}") job.id.toString @@ -179,7 +181,8 @@ object StreamJobManager { jobDesc.streamingQuery.stop() jobDesc.thread.interrupt() jobs.remove(streamName) - LOGGER.audit(s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " + + Audit.log(LOGGER, + s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " + s"from ${jobDesc.sourceDb}.${jobDesc.sourceTable} " + s"to ${jobDesc.sinkDb}.${jobDesc.sinkTable}") } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 838b28d..7eb6e88 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -170,7 +170,7 @@ class CarbonSession(@transient val sc: SparkContext, */ private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = { val analyzed = qe.analyzed - val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOG = LogServiceFactory.getLogService(this.getClass.getName) analyzed match { case _@Project(columns, _@Filter(expr, s: SubqueryAlias)) if s.child.isInstanceOf[LogicalRelation] && http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala index 1a76ed7..2d4fe84 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala @@ -23,7 +23,8 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -31,12 +32,12 @@ import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ class MergeBloomIndexEventListener extends OperationEventListener with Logging { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { event match { case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.audit("Load post status event-listener called for merge bloom index") + Audit.log(LOGGER, "Load post status event-listener called for merge bloom index") val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index a0c19e9..639a0e3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -23,11 +23,11 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.internal.Logging -import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.util.CarbonException -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} @@ -40,12 +40,12 @@ import org.apache.carbondata.processing.merger.CarbonDataMergerUtil import org.apache.carbondata.spark.util.CommonUtil class MergeIndexEventListener extends OperationEventListener with Logging { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { event match { case preStatusUpdateEvent: LoadTablePostExecutionEvent => - LOGGER.audit("Load post status event-listener called for merge index") + Audit.log(LOGGER, "Load post status event-listener called for merge index") val loadModel = preStatusUpdateEvent.getCarbonLoadModel val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable val compactedSegments = loadModel.getMergedSegmentIds @@ -71,7 +71,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { } } case alterTableCompactionPostEvent: AlterTableCompactionPostEvent => - LOGGER.audit("Merge index for compaction called") + Audit.log(LOGGER, "Merge index for compaction called") val carbonTable = alterTableCompactionPostEvent.carbonTable val mergedLoads = alterTableCompactionPostEvent.compactedLoads val sparkSession = alterTableCompactionPostEvent.sparkSession @@ -79,11 +79,10 @@ class MergeIndexEventListener extends OperationEventListener with Logging { mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, mergedLoads) } case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => - val alterTableModel = alterTableMergeIndexEvent.alterTableModel val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession if (!carbonMainTable.isStreamingSink) { - LOGGER.audit(s"Compaction request received for table " + + Audit.log(LOGGER, s"Compaction request received for table " + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") LOGGER.info(s"Merge Index request received for table " + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") @@ -129,7 +128,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { clearBlockDataMapCache(carbonMainTable, validSegmentIds) val requestMessage = "Compaction request completed for table " + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" - LOGGER.audit(requestMessage) + Audit.log(LOGGER, requestMessage) LOGGER.info(requestMessage) } else { val lockMessage = "Not able to acquire the compaction lock for table " + @@ -138,7 +137,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { .getTableName }" - LOGGER.audit(lockMessage) + Audit.log(LOGGER, lockMessage) LOGGER.error(lockMessage) CarbonException.analysisException( "Table is already locked for compaction. Please try after some time.") http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 66f9e47..081482c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -22,8 +22,10 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ +import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.metadata.ColumnarFormatVersion @@ -151,7 +153,7 @@ case class CarbonCreateDataMapCommand( systemFolderLocation, tableIdentifier, dmProviderName) OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent, operationContext) - LOGGER.audit(s"DataMap $dataMapName successfully added") + Audit.log(LOGGER, s"DataMap $dataMapName successfully added") Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 4607de0..67e2dee 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -28,7 +28,8 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} @@ -52,7 +53,7 @@ case class CarbonDropDataMapCommand( forceDrop: Boolean = false) extends AtomicRunnableCommand { - private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) private var dataMapProvider: DataMapProvider = _ var mainTable: CarbonTable = _ var dataMapSchema: DataMapSchema = _ @@ -111,7 +112,7 @@ case class CarbonDropDataMapCommand( locksToBeAcquired foreach { lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock) } - LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]") + Audit.log(LOGGER, s"Deleting datamap [$dataMapName] under table [$tableName]") // drop index,mv datamap on the main table. if (mainTable != null && @@ -172,7 +173,7 @@ case class CarbonDropDataMapCommand( case e: NoSuchDataMapException => throw e case ex: Exception => - LOGGER.error(ex, s"Dropping datamap $dataMapName failed") + LOGGER.error(s"Dropping datamap $dataMapName failed", ex) throwMetadataException(dbName, tableName, s"Dropping datamap $dataMapName failed: ${ ex.getMessage }") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index b699ec1..8e338db 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -25,14 +25,15 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel} +import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CompactionModel} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory @@ -63,7 +64,7 @@ case class CarbonAlterTableCompactionCommand( var table: CarbonTable = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val tableName = alterTableModel.tableName.toLowerCase val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) table = if (tableInfoOp.isDefined) { @@ -204,7 +205,7 @@ case class CarbonAlterTableCompactionCommand( storeLocation: String, compactedSegments: java.util.List[String], operationContext: OperationContext): Unit = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase) val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel) if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { @@ -216,7 +217,7 @@ case class CarbonAlterTableCompactionCommand( } } - LOGGER.audit(s"Compaction request received for table " + + Audit.log(LOGGER, s"Compaction request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable @@ -313,7 +314,7 @@ case class CarbonAlterTableCompactionCommand( throw e } } else { - LOGGER.audit("Not able to acquire the compaction lock for table " + + Audit.log(LOGGER, "Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error(s"Not able to acquire the compaction lock for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") @@ -328,7 +329,7 @@ case class CarbonAlterTableCompactionCommand( operationContext: OperationContext, sparkSession: SparkSession ): Unit = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable // 1. delete the lock of streaming.lock, forcing the stream to be closed val streamingLock = CarbonLockFactory.getCarbonLockObj( http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala index a477167..ba20773 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala @@ -33,7 +33,7 @@ case class CarbonAlterTableFinishStreaming( extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession) - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val streamingLock = CarbonLockFactory.getCarbonLockObj( carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), LockUsage.STREAMING_LOCK) http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index e561a5a..a390191 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -147,7 +147,7 @@ case class CarbonCleanFilesCommand( case e: Throwable => // catch all exceptions to avoid failure LogServiceFactory.getLogService(this.getClass.getCanonicalName) - .error(e, "Failed to clean in progress segments") + .error("Failed to clean in progress segments", e) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala index 7cf8c1e..ee0f5ab 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala @@ -36,7 +36,7 @@ case class CarbonInsertIntoCommand( var loadCommand: CarbonLoadDataCommand = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) def containsLimit(plan: LogicalPlan): Boolean = { plan find { case limit: GlobalLimit => true http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 43c8b86..22d0bb3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration +import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql._ @@ -48,7 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -109,7 +111,7 @@ case class CarbonLoadDataCommand( var parentTablePath: String = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) table = if (tableInfoOp.isDefined) { CarbonTable.buildFromTableInfo(tableInfoOp.get) @@ -121,7 +123,7 @@ case class CarbonLoadDataCommand( } if (null == relation.carbonTable) { LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") - LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") + Audit.log(LOGGER, s"Data loading failed. table not found: $dbName.$tableName") throw new NoSuchTableException(dbName, tableName) } relation.carbonTable @@ -150,7 +152,7 @@ case class CarbonLoadDataCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val carbonProperty: CarbonProperties = CarbonProperties.getInstance() var concurrentLoadLock: Option[ICarbonLock] = None carbonProperty.addProperty("zookeeper.enable.lock", "false") @@ -341,7 +343,7 @@ case class CarbonLoadDataCommand( if (isUpdateTableStatusRequired) { CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) } - LOGGER.error(ex, s"Dataload failure for $dbName.$tableName") + LOGGER.error(s"Dataload failure for $dbName.$tableName", ex) throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") // In case of event related exception case preEventEx: PreEventException => @@ -352,7 +354,7 @@ case class CarbonLoadDataCommand( if (isUpdateTableStatusRequired) { CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) } - LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") + Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. Please check the logs") throw ex } finally { releaseConcurrentLoadLock(concurrentLoadLock, LOGGER) @@ -369,7 +371,7 @@ case class CarbonLoadDataCommand( } catch { case ex: Exception => LOGGER.error(ex) - LOGGER.audit(s"Dataload failure for $dbName.$tableName. " + + Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. " + "Problem deleting the partition folder") throw ex } @@ -377,10 +379,10 @@ case class CarbonLoadDataCommand( } } catch { case dle: DataLoadingException => - LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage) + Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + dle.getMessage) throw dle case mce: MalformedCarbonCommandException => - LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage) + Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + mce.getMessage) throw mce } Seq.empty @@ -412,7 +414,7 @@ case class CarbonLoadDataCommand( } private def releaseConcurrentLoadLock(concurrentLoadLock: Option[ICarbonLock], - LOGGER: LogService): Unit = { + LOGGER: Logger): Unit = { if (concurrentLoadLock.isDefined) { if (concurrentLoadLock.get.unlock()) { LOGGER.info("concurrent_load lock for table" + table.getTablePath + @@ -432,7 +434,7 @@ case class CarbonLoadDataCommand( partitionStatus: SegmentStatus, hadoopConf: Configuration, operationContext: OperationContext, - LOGGER: LogService): Seq[Row] = { + LOGGER: Logger): Seq[Row] = { var rows = Seq.empty[Row] val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier @@ -561,7 +563,7 @@ case class CarbonLoadDataCommand( partitionStatus: SegmentStatus, hadoopConf: Configuration, operationContext: OperationContext, - LOGGER: LogService): Seq[Row] = { + LOGGER: Logger): Seq[Row] = { var rows = Seq.empty[Row] val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID() @@ -615,9 +617,8 @@ case class CarbonLoadDataCommand( hadoopConf: Configuration, dataFrame: Option[DataFrame], operationContext: OperationContext, - LOGGER: LogService): Seq[Row] = { + LOGGER: Logger): Seq[Row] = { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get var timeStampformatString = carbonLoadModel.getTimestampformat if (timeStampformatString.isEmpty) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index cf88fb9..39e85ba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand} import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec @@ -47,8 +48,7 @@ case class RefreshCarbonTableCommand( databaseNameOp: Option[String], tableName: String) extends MetadataCommand { - val LOGGER: LogService = - LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore @@ -88,7 +88,7 @@ case class RefreshCarbonTableCommand( val msg = s"Table registration with Database name [$databaseName] and Table name " + s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" + s" not copied under database [$databaseName]" - LOGGER.audit(msg) + Audit.log(LOGGER, msg) throwMetadataException(databaseName, tableName, msg) } // 2.2.1 Register the aggregate tables to hive @@ -101,14 +101,14 @@ case class RefreshCarbonTableCommand( registerAllPartitionsToHive(identifier, sparkSession) } } else { - LOGGER.audit( + Audit.log(LOGGER, s"Table registration with Database name [$databaseName] and Table name [$tableName] " + s"failed." + s"Table [$tableName] either non carbon table or stale carbon table under database " + s"[$databaseName]") } } else { - LOGGER.audit( + Audit.log(LOGGER, s"Table registration with Database name [$databaseName] and Table name [$tableName] " + s"failed." + s"Table [$tableName] either already exists or registered under database [$databaseName]") @@ -154,7 +154,7 @@ case class RefreshCarbonTableCommand( OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext) CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath)) .run(sparkSession) - LOGGER.audit(s"Table registration with Database name [$dbName] and Table name " + + Audit.log(LOGGER, s"Table registration with Database name [$dbName] and Table name " + s"[$tableName] is successful.") } catch { case e: AnalysisException => throw e http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 0127d7e..053937b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -21,8 +21,10 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ +import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} @@ -79,7 +81,7 @@ private[sql] case class CarbonProjectForDeleteCommand( var lockStatus = false try { lockStatus = metadataLock.lockWithRetries() - LOGGER.audit(s" Delete data request has been received " + + Audit.log(LOGGER, s" Delete data request has been received " + s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.") if (lockStatus) { LOGGER.info("Successfully able to get the table metadata file lock") @@ -119,7 +121,7 @@ private[sql] case class CarbonProjectForDeleteCommand( CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) case e: Exception => - LOGGER.error(e, "Exception in Delete data operation " + e.getMessage) + LOGGER.error("Exception in Delete data operation " + e.getMessage, e) // ****** start clean up. // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 4e9c1af..31e1779 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -163,7 +163,7 @@ private[sql] case class CarbonProjectForUpdateCommand( CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) case e: Exception => - LOGGER.error(e, "Exception in update operation") + LOGGER.error("Exception in update operation", e) // ****** start clean up. // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "") http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 7e7f671..d118539 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -20,9 +20,7 @@ package org.apache.spark.sql.execution.command.mutation import java.util import scala.collection.JavaConverters._ -import scala.reflect.ClassTag -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job @@ -34,7 +32,9 @@ import org.apache.spark.sql.execution.command.ExecutionErrors import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.api.CarbonStore.LOGGER +import org.apache.carbondata.common.logging.impl.Audit +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory @@ -46,13 +46,12 @@ import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.exception.MultipleMatchingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.spark.DeleteDelataResultImpl object DeleteExecution { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) /** * generate the delete delta files in each segment as per the RDD. @@ -167,7 +166,7 @@ object DeleteExecution { } else { // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") + Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }") val errorMsg = "Delete data operation is failed due to failure in creating delete delta file for " + "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + @@ -202,7 +201,7 @@ object DeleteExecution { listOfSegmentToBeMarkedDeleted) ) { LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }") - LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }") + Audit.log(LOGGER, s"Delete data operation is successful for ${ database }.${ tableName }") } else { // In case of failure , clean all related delete delta files @@ -210,7 +209,7 @@ object DeleteExecution { val errorMessage = "Delete data operation is failed due to failure " + "in table status updation." - LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") + Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }") LOGGER.error("Delete data operation is failed due to failure in table status updation.") executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE executorErrors.errorMsg = errorMessage @@ -291,12 +290,12 @@ object DeleteExecution { deleteStatus = SegmentStatus.SUCCESS } catch { case e : MultipleMatchingException => - LOGGER.audit(e.getMessage) + Audit.log(LOGGER, e.getMessage) LOGGER.error(e.getMessage) // dont throw exception here. case e: Exception => val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }." - LOGGER.audit(errorMsg) + Audit.log(LOGGER, errorMsg) LOGGER.error(errorMsg + e.getMessage) throw e } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index 35fc3c3..3472d8a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -25,21 +25,20 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql._ import org.apache.spark.sql.execution.command.AlterTableModel import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand -import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager import org.apache.carbondata.core.util.ThreadLocalSessionInfo -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} object HorizontalCompaction { - val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOG = LogServiceFactory.getLogService(this.getClass.getName) /** * The method does horizontal compaction. After Update and Delete completion @@ -131,7 +130,7 @@ object HorizontalCompaction { } LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].") - LOG.audit(s"Horizontal Update Compaction operation started for [$db.$table].") + Audit.log(LOG, s"Horizontal Update Compaction operation started for [$db.$table].") try { // Update Compaction. @@ -155,7 +154,7 @@ object HorizontalCompaction { s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) } LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") - LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") + Audit.log(LOG, s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") } /** @@ -181,7 +180,7 @@ object HorizontalCompaction { } LOG.info(s"Horizontal Delete Compaction operation started for [$db.$table].") - LOG.audit(s"Horizontal Delete Compaction operation started for [$db.$table].") + Audit.log(LOG, s"Horizontal Delete Compaction operation started for [$db.$table].") try { @@ -226,7 +225,7 @@ object HorizontalCompaction { timestamp.toString, segmentUpdateStatusManager) if (updateStatus == false) { - LOG.audit(s"Delete Compaction data operation is failed for [$db.$table].") + Audit.log(LOG, s"Delete Compaction data operation is failed for [$db.$table].") LOG.error("Delete Compaction data operation is failed.") throw new HorizontalCompactionException( s"Horizontal Delete Compaction Failed for [$db.$table] ." + @@ -234,7 +233,7 @@ object HorizontalCompaction { } else { LOG.info(s"Horizontal Delete Compaction operation completed for [$db.$table].") - LOG.audit(s"Horizontal Delete Compaction operation completed for [$db.$table].") + Audit.log(LOG, s"Horizontal Delete Compaction operation completed for [$db.$table].") } } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index b76a485..c230322 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -28,13 +28,14 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.AlterTableUtil -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.mutate.CarbonUpdateUtil @@ -48,7 +49,7 @@ case class CarbonAlterTableDropPartitionCommand( model: AlterTableDropPartitionModel) extends AtomicRunnableCommand { - private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]() override def processMetadata(sparkSession: SparkSession): Seq[Row] = { @@ -121,7 +122,7 @@ case class CarbonAlterTableDropPartitionCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase) val tableName = model.tableName var locks = List.empty[ICarbonLock] @@ -168,7 +169,7 @@ case class CarbonAlterTableDropPartitionCommand( LOGGER.info("Locks released after alter table drop partition action.") } LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName") + Audit.log(LOGGER, s"Alter table drop partition is successful for table $dbName.$tableName") Seq.empty } @@ -177,7 +178,7 @@ case class CarbonAlterTableDropPartitionCommand( carbonLoadModel: CarbonLoadModel, dropWithData: Boolean, oldPartitionIds: List[Int]): Unit = { - LOGGER.audit(s"Drop partition request received for table " + + Audit.log(LOGGER, s"Drop partition request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") try { startDropThreads( @@ -246,7 +247,7 @@ case class dropPartitionThread(sqlContext: SQLContext, dropWithData: Boolean, oldPartitionIds: List[Int]) extends Thread { - private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) override def run(): Unit = { try { @@ -254,8 +255,8 @@ case class dropPartitionThread(sqlContext: SQLContext, segmentId, partitionId, dropWithData, oldPartitionIds) } catch { case e: Exception => - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) - LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }") + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }", e) } } @@ -274,7 +275,7 @@ case class dropPartitionThread(sqlContext: SQLContext, future.get } catch { case e: Exception => - LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }") + LOGGER.error(s"Exception in partition drop thread ${ e.getMessage }", e) throw e } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 753abaf..8b337c6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.{AlterTableUtil, PartitionUtils} -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -54,7 +55,7 @@ case class CarbonAlterTableSplitPartitionCommand( splitPartitionModel: AlterTableSplitPartitionModel) extends AtomicRunnableCommand { - private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]() override def processMetadata(sparkSession: SparkSession): Seq[Row] = { @@ -182,11 +183,12 @@ case class CarbonAlterTableSplitPartitionCommand( } finally { AlterTableUtil.releaseLocks(locks) CacheProvider.getInstance().dropAllCache() - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) LOGGER.info("Locks released after alter table add/split partition action.") if (success) { LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName") + Audit.log(LOGGER, + s"Alter table add/split partition is successful for table $dbName.$tableName") } } Seq.empty @@ -198,7 +200,7 @@ case class CarbonAlterTableSplitPartitionCommand( carbonLoadModel: CarbonLoadModel, oldPartitionIdList: List[Int] ): Unit = { - LOGGER.audit(s"Add partition request received for table " + + Audit.log(LOGGER, s"Add partition request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") try { startSplitThreads(sqlContext, @@ -264,7 +266,7 @@ case class SplitThread(sqlContext: SQLContext, partitionId: String, oldPartitionIdList: List[Int]) extends Thread { - private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) override def run(): Unit = { var triggeredSplitPartitionStatus = false @@ -275,7 +277,7 @@ case class SplitThread(sqlContext: SQLContext, triggeredSplitPartitionStatus = true } catch { case e: Exception => - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }") exception = e } @@ -301,7 +303,7 @@ case class SplitThread(sqlContext: SQLContext, } } catch { case e: Exception => - LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }") + LOGGER.error(s"Exception in partition split thread ${ e.getMessage }", e) throw e } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index b33652f..f606c04 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -64,7 +64,7 @@ object AlterTableDropPartitionPreStatusListener extends OperationEventListener { trait CommitHelper { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) protected def markInProgressSegmentAsDeleted(tableStatusFile: String, operationContext: OperationContext, @@ -586,7 +586,7 @@ object CommitPreAggregateListener extends OperationEventListener with CommitHelp } catch { case e: Exception => operationContext.setProperty("commitComplete", false) - LOGGER.error(e, "Problem while committing data maps") + LOGGER.error("Problem while committing data maps", e) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index f26d1cb..d16f570 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -419,11 +419,10 @@ object PreAggregateUtil { */ def updateMainTable(carbonTable: CarbonTable, childSchema: DataMapSchema, sparkSession: SparkSession): TableInfo = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) var locks = List.empty[ICarbonLock] - var numberOfCurrentChild: Int = 0 val dbName = carbonTable.getDatabaseName val tableName = carbonTable.getTableName try { @@ -450,7 +449,7 @@ object PreAggregateUtil { thriftTableInfo } catch { case e: Exception => - LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes") + LOGGER.error("Pre Aggregate Parent table update failed reverting changes", e) throw e } finally { // release lock after command execution completion
