This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bdd69d  [CARBONDATA-3955] Fix load failures due to daylight saving 
time changes
4bdd69d is described below

commit 4bdd69d766e244df7625117550453f0195c84750
Author: ShreelekhyaG <[email protected]>
AuthorDate: Tue Aug 18 18:31:12 2020 +0530

    [CARBONDATA-3955] Fix load failures due to daylight saving time changes
    
    Why is this PR needed?
    1. Fix load failures due to daylight saving time changes.
    2. During load, date/timestamp year values with >4 digit should fail or
    be null according to bad records action property.
    
    What changes were proposed in this PR?
    New property added to setLeniet as true and parse timestampformat.
    Added validation for timestamp range values.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3896
---
 .../core/constants/CarbonCommonConstants.java      | 10 ++++
 .../carbondata/core/util/CarbonProperties.java     |  7 +++
 .../apache/carbondata/core/util/DataTypeUtil.java  | 39 ++++++++++++++--
 .../apache/carbondata/core/util/SessionParams.java |  2 +
 .../spark/rdd/CarbonDataRDDFactory.scala           |  5 +-
 .../badrecordloger/BadRecordActionTest.scala       | 21 +++++++++
 .../TestLoadDataWithDiffTimestampFormat.scala      | 53 ++++++++++++++++++++++
 .../carbondata/spark/util/BadRecordUtil.scala      | 17 ++++++-
 8 files changed, 149 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a271da6..1a19b86 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1592,6 +1592,16 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_LUCENE_INDEX_STOP_WORDS_DEFAULT = "false";
 
+  // Property to enable parsing the timestamp/date data with setLenient = true 
in load
+  // flow if it fails with parse invalid timestamp data. (example: 1941-03-15 
00:00:00
+  // is valid time in Asia/Calcutta zone and is invalid and will fail to parse 
in Asia/Shanghai
+  // zone as DST is observed and clocks were turned forward 1 hour to 
1941-03-15 01:00:00)
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String
+      CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE = 
"carbon.load.dateformat.setlenient.enable";
+
+  public static final String CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE_DEFAULT 
= "false";
+
   
//////////////////////////////////////////////////////////////////////////////////////////
   // Constant value start here
   
//////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 1cd181d..cf339d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2076,4 +2076,11 @@ public final class CarbonProperties {
   public static void setAuditEnabled(boolean enabled) {
     getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_AUDIT, 
String.valueOf(enabled));
   }
+
+  public boolean isSetLenientEnabled() {
+    String configuredValue =
+        
getProperty(CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE,
+            
CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE_DEFAULT);
+    return Boolean.parseBoolean(configuredValue);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index dd362d6..fca2639 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -435,7 +436,8 @@ public final class DataTypeUtil {
 
   private static Object parseTimestamp(String dimensionValue, String 
dateFormat) {
     Date dateToStr;
-    DateFormat dateFormatter;
+    DateFormat dateFormatter = null;
+    long timeValue;
     try {
       if (null != dateFormat && !dateFormat.trim().isEmpty()) {
         dateFormatter = new SimpleDateFormat(dateFormat);
@@ -444,9 +446,40 @@ public final class DataTypeUtil {
         dateFormatter = timestampFormatter.get();
       }
       dateToStr = dateFormatter.parse(dimensionValue);
-      return dateToStr.getTime();
+      timeValue = dateToStr.getTime();
+      validateTimeStampRange(timeValue);
+      return timeValue;
     } catch (ParseException e) {
-      throw new NumberFormatException(e.getMessage());
+      // If the parsing fails, try to parse again with setLenient to true if 
the property is set
+      // (example: 1941-03-15 00:00:00 is invalid data and will fail to parse 
in Asia/Shanghai zone
+      // as DST is observed and clocks were turned forward 1 hour to 
1941-03-15 01:00:00)
+      if (CarbonProperties.getInstance().isSetLenientEnabled()) {
+        try {
+          dateFormatter.setLenient(true);
+          dateToStr = dateFormatter.parse(dimensionValue);
+          timeValue = dateToStr.getTime();
+          validateTimeStampRange(timeValue);
+          LOGGER.info("Parsed data with lenience as true, setting back to 
default mode");
+          return timeValue;
+        } catch (ParseException ex) {
+          LOGGER.info("Failed to parse data with lenience as true, setting 
back to default mode");
+          throw new NumberFormatException(ex.getMessage());
+        } finally {
+          dateFormatter.setLenient(false);
+        }
+      } else {
+        throw new NumberFormatException(e.getMessage());
+      }
+    }
+  }
+
+  private static void validateTimeStampRange(Long timeValue) {
+    if (timeValue < DateDirectDictionaryGenerator.MIN_VALUE
+        || timeValue > DateDirectDictionaryGenerator.MAX_VALUE) {
+      throw new NumberFormatException(
+          "timestamp column data value: " + timeValue + "is not in valid range 
of: "
+              + DateDirectDictionaryGenerator.MIN_VALUE + " and "
+              + DateDirectDictionaryGenerator.MAX_VALUE);
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 6848626..7a9d547 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -32,6 +32,7 @@ import 
org.apache.carbondata.core.exception.InvalidConfigurationException;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_MV;
+import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT;
@@ -153,6 +154,7 @@ public class SessionParams implements Serializable, 
Cloneable {
       case ENABLE_UNSAFE_IN_QUERY_EXECUTION:
       case ENABLE_AUTO_LOAD_MERGE:
       case CARBON_PUSH_ROW_FILTERS_FOR_VECTOR:
+      case CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE:
       case CARBON_ENABLE_INDEX_SERVER:
       case CARBON_QUERY_STAGE_INPUT:
       case CARBON_ENABLE_MV:
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8074119..95c6941 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -55,7 +55,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, 
CarbonUtil, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.view.{MVSchema, MVStatus}
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
@@ -816,10 +816,13 @@ object CarbonDataRDDFactory {
       val partitionByRdd = keyRDD.partitionBy(
         new SegmentPartitioner(segmentIdIndex, segmentUpdateParallelism))
 
+      val carbonSessionInfoBroadcast = sqlContext.sparkSession.sparkContext
+        .broadcast(ThreadLocalSessionInfo.getCarbonSessionInfo)
       // because partitionId=segmentIdIndex*parallelism+RandomPart and 
RandomPart<parallelism,
       // so segmentIdIndex=partitionId/parallelism, this has been verified.
       val conf = 
SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, 
hadoopConf)
       partitionByRdd.map(_._2).mapPartitions { partition =>
+        
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfoBroadcast.value)
         
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         val partitionId = TaskContext.getPartitionId()
         val segIdIndex = partitionId / segmentUpdateParallelism
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
index 27abdc6..72cf525 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
@@ -19,6 +19,8 @@ package org.apache.carbondata.spark.testsuite.badrecordloger
 
 import java.io.File
 
+import scala.collection.mutable.ListBuffer
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
@@ -26,6 +28,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.BadRecordUtil
 
 class BadRecordActionTest extends QueryTest {
 
@@ -270,6 +273,24 @@ class BadRecordActionTest extends QueryTest {
     }
   }
 
+  test("test bad record FAIL with invalid timestamp range") {
+    val csvPath = s"$resourcesPath/badrecords/invalidTimeStampRange.csv"
+    val rows = new ListBuffer[Array[String]]
+    rows += Array("ID", "date", "time")
+    rows += Array("1", "2016-7-24", "342016-7-24 01:02:30")
+    BadRecordUtil.createCSV(rows, csvPath)
+    sql("DROP TABLE IF EXISTS test_time")
+    sql("CREATE TABLE IF NOT EXISTS test_time (ID Int, date Date, time 
Timestamp) STORED AS carbondata " +
+        "TBLPROPERTIES('dateformat'='yyyy-MM-dd', 
'timestampformat'='yyyy-MM-dd HH:mm:ss') ")
+    val exception = intercept[Exception] {
+      sql(s" LOAD DATA LOCAL INPATH 
'$resourcesPath/badrecords/invalidTimeStampRange.csv' " +
+          s"into table test_time options ('bad_records_action'='fail')")
+    }
+    assert(exception.getMessage.contains("Data load failed due to bad record: 
The value with column name time and column data" +
+        " type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad 
record logger to know the detail reason"))
+    sql("DROP TABLE IF EXISTS test_time")
+    FileUtils.forceDelete(new File(csvPath))
+  }
 
   private def currentPath: String = {
     new File(this.getClass.getResource("/").getPath + "../../")
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
index 57c8f4b..319ad4e 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
@@ -17,9 +17,13 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import java.io.File
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
+import java.util.TimeZone
 
+import scala.collection.mutable.ListBuffer
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 
@@ -29,10 +33,15 @@ import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.common.constants.LoggerAction
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.BadRecordUtil
 
 class TestLoadDataWithDiffTimestampFormat extends QueryTest with 
BeforeAndAfterAll {
 
+  val defaultTimeZone = TimeZone.getDefault
+  val csvPath = s"$resourcesPath/differentZoneTimeStamp.csv"
+
   override def beforeAll {
+    generateCSVFile()
     CarbonProperties.getInstance().addProperty(
       CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, 
LoggerAction.FORCE.name())
 
@@ -306,7 +315,51 @@ class TestLoadDataWithDiffTimestampFormat extends 
QueryTest with BeforeAndAfterA
     }
   }
 
+  test("test load, update data with setlenient carbon property for daylight " +
+       "saving time from different timezone") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE,
 "true")
+    TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
+    sql("DROP TABLE IF EXISTS test_time")
+    sql("CREATE TABLE IF NOT EXISTS test_time (ID Int, date Date, time 
Timestamp) STORED AS carbondata " +
+        "TBLPROPERTIES('dateformat'='yyyy-MM-dd', 
'timestampformat'='yyyy-MM-dd HH:mm:ss') ")
+    sql(s" LOAD DATA LOCAL INPATH '$resourcesPath/differentZoneTimeStamp.csv' 
into table test_time")
+    sql(s"insert into test_time select 11, '2016-7-24', '1941-3-15 00:00:00' ")
+    sql("update test_time set (time) = ('1941-3-15 00:00:00') where ID='2'")
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 1"), 
Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 11"), 
Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 2"), 
Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    sql("DROP TABLE test_time")
+    
CarbonProperties.getInstance().removeProperty(CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE)
+  }
+
+  test("test load, update data with setlenient session level property for 
daylight " +
+       "saving time from different timezone") {
+    sql("set carbon.load.dateformat.setlenient.enable = true")
+    TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
+    sql("DROP TABLE IF EXISTS test_time")
+    sql("CREATE TABLE IF NOT EXISTS test_time (ID Int, date Date, time 
Timestamp) STORED AS carbondata " +
+        "TBLPROPERTIES('dateformat'='yyyy-MM-dd', 
'timestampformat'='yyyy-MM-dd HH:mm:ss') ")
+    sql(s" LOAD DATA LOCAL INPATH '$resourcesPath/differentZoneTimeStamp.csv' 
into table test_time")
+    sql(s"insert into test_time select 11, '2016-7-24', '1941-3-15 00:00:00' ")
+    sql("update test_time set (time) = ('1941-3-15 00:00:00') where ID='2'")
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 1"), 
Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 11"), 
Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    checkAnswer(sql("SELECT time FROM test_time WHERE ID = 2"), 
Seq(Row(Timestamp.valueOf("1941-3-15 01:00:00"))))
+    sql("DROP TABLE test_time")
+    defaultConfig()
+  }
+
+  def generateCSVFile(): Unit = {
+    val rows = new ListBuffer[Array[String]]
+    rows += Array("ID", "date", "time")
+    rows += Array("1", "1941-3-15", "1941-3-15 00:00:00")
+    rows += Array("2", "2016-7-24", "2016-7-24 01:02:30")
+    BadRecordUtil.createCSV(rows, csvPath)
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS t3")
+    FileUtils.forceDelete(new File(csvPath))
+    TimeZone.setDefault(defaultTimeZone)
   }
 }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
index cc7b152..3f4fa60 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
@@ -17,7 +17,9 @@
 
 package org.apache.carbondata.spark.util
 
-import java.io.{File, FileFilter}
+import java.io.{BufferedWriter, File, FileFilter, FileWriter}
+import scala.collection.mutable.ListBuffer
+import au.com.bytecode.opencsv.CSVWriter
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -68,4 +70,17 @@ object BadRecordUtil {
     badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName
     
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(badRecordLocation))
   }
+
+  def createCSV(rows: ListBuffer[Array[String]], csvPath: String): Unit = {
+    val out = new BufferedWriter(new FileWriter(csvPath))
+    val writer: CSVWriter = new CSVWriter(out)
+    try {
+      for (row <- rows) {
+        writer.writeNext(row)
+      }
+    } finally {
+      out.close()
+      writer.close()
+    }
+  }
 }

Reply via email to