Repository: carbondata
Updated Branches:
  refs/heads/master 4df335f20 -> 6d40d3a98


[CARBONDATA-2777] Fixed: NonTransactional tables, Select count(*) is not giving 
latest results for incremental load with same segment ID (UUID)

problem:
[CARBONDATA-2777] NonTransactional tables, Select count(*) is not giving latest 
results for incremental load with same segment ID (UUID)

solution:
For NonTransactional tables, segments need to be refreshed if the segment 
latest timestamp got changed by the incremental load for count(*) flow also. 
This change is already present in select * flow and missed here.

This closes #2547


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6d40d3a9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6d40d3a9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6d40d3a9

Branch: refs/heads/master
Commit: 6d40d3a98d4130f93982f62532b013e74d349fc3
Parents: 4df335f
Author: ajantha-bhat <[email protected]>
Authored: Tue Jul 24 15:49:02 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Wed Jul 25 12:55:39 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableInputFormat.java      | 21 +++++++++++++++++++-
 .../TestNonTransactionalCarbonTable.scala       | 18 +++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d40d3a9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bd6b775..f53a1d7 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -590,7 +590,26 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     // TODO: currently only batch segment is supported, add support for 
streaming table
     List<Segment> filteredSegment =
         getFilteredSegment(job, allSegments.getValidSegments(), false, 
readCommittedScope);
-
+    /* In the select * flow, getSplits() method was clearing the segmentMap if,
+    segment needs refreshing. same thing need for select count(*) flow also.
+    For NonTransactional table, one of the reason for a segment refresh is 
below scenario.
+    SDK is written one set of files with UUID, with same UUID it can write 
again.
+    So, latest files content should reflect the new count by refreshing the 
segment */
+    List<Segment> toBeCleanedSegments = new ArrayList<>();
+    for (Segment eachSegment : filteredSegment) {
+      boolean refreshNeeded = DataMapStoreManager.getInstance()
+          
.getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration()))
+          .isRefreshNeeded(eachSegment,
+              
updateStatusManager.getInvalidTimestampRange(eachSegment.getSegmentNo()));
+      if (refreshNeeded) {
+        toBeCleanedSegments.add(eachSegment);
+      }
+    }
+    if (toBeCleanedSegments.size() > 0) {
+      DataMapStoreManager.getInstance()
+          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
+              toBeCleanedSegments);
+    }
     List<ExtendedBlocklet> blocklets =
         blockletMap.prune(filteredSegment, null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d40d3a9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index a96c258..c7d9caa 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -350,6 +350,24 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     cleanTestData()
   }
 
+  test("test count star with multiple loads files with same schema and UUID") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestDataWithSameUUID(3, false, null, List("name"))
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable """), Seq(Row(3)))
+    buildTestDataWithSameUUID(3, false, null, List("name"))
+    // should reflect new count
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable """), Seq(Row(6)))
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
   test("test create external table with sort columns") {
     buildTestDataWithSortColumns(List("age","name"))
     assert(new File(writerPath).exists())

Reply via email to