Repository: carbondata Updated Branches: refs/heads/master 4df335f20 -> 6d40d3a98
[CARBONDATA-2777] Fixed: NonTransactional tables, Select count(*) is not giving latest results for incremental load with same segment ID (UUID) problem: [CARBONDATA-2777] NonTransactional tables, Select count(*) is not giving latest results for incremental load with same segment ID (UUID) solution: For NonTransactional tables, segments need to be refreshed if the segment latest timestamp got changed by the incremental load for count(*) flow also. This change is already present in select * flow and missed here. This closes #2547 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6d40d3a9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6d40d3a9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6d40d3a9 Branch: refs/heads/master Commit: 6d40d3a98d4130f93982f62532b013e74d349fc3 Parents: 4df335f Author: ajantha-bhat <[email protected]> Authored: Tue Jul 24 15:49:02 2018 +0530 Committer: ravipesala <[email protected]> Committed: Wed Jul 25 12:55:39 2018 +0530 ---------------------------------------------------------------------- .../hadoop/api/CarbonTableInputFormat.java | 21 +++++++++++++++++++- .../TestNonTransactionalCarbonTable.scala | 18 +++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d40d3a9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index bd6b775..f53a1d7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -590,7 +590,26 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { // TODO: currently only batch segment is supported, add support for streaming table List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope); - + /* In the select * flow, getSplits() method was clearing the segmentMap if, + segment needs refreshing. same thing need for select count(*) flow also. + For NonTransactional table, one of the reason for a segment refresh is below scenario. + SDK is written one set of files with UUID, with same UUID it can write again. + So, latest files content should reflect the new count by refreshing the segment */ + List<Segment> toBeCleanedSegments = new ArrayList<>(); + for (Segment eachSegment : filteredSegment) { + boolean refreshNeeded = DataMapStoreManager.getInstance() + .getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration())) + .isRefreshNeeded(eachSegment, + updateStatusManager.getInvalidTimestampRange(eachSegment.getSegmentNo())); + if (refreshNeeded) { + toBeCleanedSegments.add(eachSegment); + } + } + if (toBeCleanedSegments.size() > 0) { + DataMapStoreManager.getInstance() + .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), + toBeCleanedSegments); + } List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions); for (ExtendedBlocklet blocklet : blocklets) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d40d3a9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index a96c258..c7d9caa 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -350,6 +350,24 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + test("test count star with multiple loads files with same schema and UUID") { + FileUtils.deleteDirectory(new File(writerPath)) + buildTestDataWithSameUUID(3, false, null, List("name")) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql(s"""select count(*) from sdkOutputTable """), Seq(Row(3))) + buildTestDataWithSameUUID(3, false, null, List("name")) + // should reflect new count + checkAnswer(sql(s"""select count(*) from sdkOutputTable """), Seq(Row(6))) + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + test("test create external table with sort columns") { buildTestDataWithSortColumns(List("age","name")) assert(new File(writerPath).exists())
