[CARBONDATA-2431] Incremental data added after external table creation is not reflecting while doing select query issue is fixed.
This closes #2262 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f1a6c7cf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f1a6c7cf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f1a6c7cf Branch: refs/heads/spark-2.3 Commit: f1a6c7cf548cd33ef26bd99f26c7fcf7e367c9c7 Parents: 2881c6b Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Thu May 3 14:11:12 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Mon May 14 11:27:33 2018 +0530 ---------------------------------------------------------------------- .../core/datamap/DataMapStoreManager.java | 19 +-- .../apache/carbondata/core/datamap/Segment.java | 7 + .../LatestFilesReadCommittedScope.java | 32 ++++- .../ReadCommittedIndexFileSnapShot.java | 10 +- .../core/readcommitter/ReadCommittedScope.java | 5 + .../TableStatusReadCommittedScope.java | 13 ++ .../core/statusmanager/SegmentRefreshInfo.java | 65 +++++++++ .../hadoop/api/CarbonTableInputFormat.java | 10 +- .../TestNonTransactionalCarbonTable.scala | 136 +++++++++++++++++++ 9 files changed, 282 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index a3be26a..072b86e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -43,6 +43,7 @@ import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorage import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.mutate.SegmentUpdateDetails; import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonSessionInfo; @@ -454,7 +455,7 @@ public final class DataMapStoreManager { // This map stores the latest segment refresh time.So in case of update/delete we check the // time against this map. - private Map<String, Long> segmentRefreshTime = new HashMap<>(); + private Map<String, SegmentRefreshInfo> segmentRefreshTime = new HashMap<>(); // This map keeps the manual refresh entries from users. It is mainly used for partition // altering. @@ -465,23 +466,25 @@ public final class DataMapStoreManager { SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails(); for (SegmentUpdateDetails updateDetails : updateStatusDetails) { UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName()); - segmentRefreshTime.put(updateVO.getSegmentId(), updateVO.getCreatedOrUpdatedTimeStamp()); + segmentRefreshTime.put(updateVO.getSegmentId(), + new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0)); } } - public boolean isRefreshNeeded(String segmentId, SegmentUpdateStatusManager statusManager) { - UpdateVO updateVO = statusManager.getInvalidTimestampRange(segmentId); + public boolean isRefreshNeeded(Segment seg, UpdateVO updateVo) throws IOException { + SegmentRefreshInfo segmentRefreshInfo = + seg.getSegmentRefreshInfo(updateVo); + String segmentId = seg.getSegmentNo(); if (segmentRefreshTime.get(segmentId) == null) { - segmentRefreshTime.put(segmentId, updateVO.getCreatedOrUpdatedTimeStamp()); + segmentRefreshTime.put(segmentId, segmentRefreshInfo); return true; } if (manualSegmentRefresh.get(segmentId) != null && manualSegmentRefresh.get(segmentId)) { manualSegmentRefresh.put(segmentId, false); return true; } - Long updateTimestamp = updateVO.getLatestUpdateTimestamp(); - boolean isRefresh = - updateTimestamp != null && (updateTimestamp > segmentRefreshTime.get(segmentId)); + + boolean isRefresh = segmentRefreshInfo.compare(segmentRefreshTime.get(segmentId)); if (isRefresh) { segmentRefreshTime.remove(segmentId); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 476f9da..85c7176 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -25,8 +25,10 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -111,6 +113,11 @@ public class Segment implements Serializable { return readCommittedScope.getCommittedIndexFile(this); } + public SegmentRefreshInfo getSegmentRefreshInfo(UpdateVO updateVo) + throws IOException { + return readCommittedScope.getCommitedSegmentRefreshInfo(this, updateVo); + } + public String getSegmentNo() { return segmentNo; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java index de7f8a9..2306330 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java @@ -28,7 +28,9 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -43,7 +45,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; - public LatestFilesReadCommittedScope(String path) { + public LatestFilesReadCommittedScope(String path) { this.carbonFilePath = path; try { takeCarbonIndexFileSnapShot(); @@ -104,6 +106,20 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { return indexFileStore; } + @Override public SegmentRefreshInfo getCommitedSegmentRefreshInfo( + Segment segment, UpdateVO updateVo) throws IOException { + Map<String, SegmentRefreshInfo> snapShot = + readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap(); + String segName; + if (segment.getSegmentNo() != null) { + segName = segment.getSegmentNo(); + } else { + segName = segment.getSegmentFileName(); + } + SegmentRefreshInfo segmentRefreshInfo = snapShot.get(segName); + return segmentRefreshInfo; + } + private String getSegmentID(String carbonIndexFileName, String indexFilePath) { if (indexFilePath.contains("/Fact/Part0/Segment_")) { // This is CarbonFile case where the Index files are present inside the Segment Folder @@ -128,6 +144,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { throw new IOException("No files are present in the table location :" + carbonFilePath); } Map<String, List<String>> indexFileStore = new HashMap<>(); + Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>(); if (file.isDirectory()) { CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); for (int i = 0; i < carbonIndexFiles.length; i++) { @@ -139,18 +156,29 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { getSegmentID(carbonIndexFiles[i].getName(), carbonIndexFiles[i].getAbsolutePath()); // TODO. During Partition table handling, place Segment File Name. List<String> indexList; + SegmentRefreshInfo segmentRefreshInfo; if (indexFileStore.get(segId) == null) { indexList = new ArrayList<>(1); + segmentRefreshInfo = + new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0); + segmentTimestampUpdaterMap.put(segId, segmentRefreshInfo); } else { // Entry is already present. indexList = indexFileStore.get(segId); + segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId); } indexList.add(carbonIndexFiles[i].getAbsolutePath()); + if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i] + .getLastModifiedTime()) { + segmentRefreshInfo + .setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime()); + } indexFileStore.put(segId, indexList); + segmentRefreshInfo.setCountOfFileInSegment(indexList.size()); } } ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot = - new ReadCommittedIndexFileSnapShot(indexFileStore); + new ReadCommittedIndexFileSnapShot(indexFileStore, segmentTimestampUpdaterMap); this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot; prepareLoadMetadata(); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java index 3e8e04f..70ca6ec 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; /** * This class is going to save the the Index files which are taken snapshot @@ -36,12 +37,19 @@ public class ReadCommittedIndexFileSnapShot implements Serializable { * Segment Numbers are mapped with list of Index Files. */ private Map<String, List<String>> segmentIndexFileMap; + private Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap; - public ReadCommittedIndexFileSnapShot(Map<String, List<String>> segmentIndexFileMap) { + public ReadCommittedIndexFileSnapShot(Map<String, List<String>> segmentIndexFileMap, + Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap) { this.segmentIndexFileMap = segmentIndexFileMap; + this.segmentTimestampUpdaterMap = segmentTimestampUpdaterMap; } public Map<String, List<String>> getSegmentIndexFileMap() { return segmentIndexFileMap; } + + public Map<String, SegmentRefreshInfo> getSegmentTimestampUpdaterMap() { + return segmentTimestampUpdaterMap; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java index 9ae462b..6ff4b89 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java @@ -23,7 +23,9 @@ import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; /** * ReadCommitted interface that defines a read scope. @@ -43,5 +45,8 @@ public interface ReadCommittedScope extends Serializable { */ public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException ; + public SegmentRefreshInfo getCommitedSegmentRefreshInfo( + Segment segment, UpdateVO updateVo) throws IOException; + public void takeCarbonIndexFileSnapShot() throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index 41ce31c..91ebd41 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -25,7 +25,9 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -77,6 +79,17 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope { return indexFiles; } + public SegmentRefreshInfo getCommitedSegmentRefreshInfo(Segment segment, UpdateVO updateVo) + throws IOException { + SegmentRefreshInfo segmentRefreshInfo; + if (updateVo != null) { + segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getCreatedOrUpdatedTimeStamp(), 0); + } else { + segmentRefreshInfo = new SegmentRefreshInfo(0L, 0); + } + return segmentRefreshInfo; + } + @Override public void takeCarbonIndexFileSnapShot() throws IOException { // Only Segment Information is updated. // File information will be fetched on the fly according to the fecthed segment info. http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java new file mode 100644 index 0000000..11fb73f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.statusmanager; + +import java.io.Serializable; + +public class SegmentRefreshInfo implements Serializable { + + private Long segmentUpdatedTimestamp; + private Integer countOfFileInSegment; + + public SegmentRefreshInfo(Long segmentUpdatedTimestamp, Integer countOfFileInSegment) { + this.segmentUpdatedTimestamp = segmentUpdatedTimestamp; + this.countOfFileInSegment = countOfFileInSegment; + } + + public Long getSegmentUpdatedTimestamp() { + return segmentUpdatedTimestamp; + } + + public void setSegmentUpdatedTimestamp(Long segmentUpdatedTimestamp) { + this.segmentUpdatedTimestamp = segmentUpdatedTimestamp; + } + + public Integer getCountOfFileInSegment() { + return countOfFileInSegment; + } + + public void setCountOfFileInSegment(Integer countOfFileInSegment) { + this.countOfFileInSegment = countOfFileInSegment; + } + + public boolean compare(Object o) { + if (!(o instanceof SegmentRefreshInfo)) return false; + + SegmentRefreshInfo that = (SegmentRefreshInfo) o; + + if (segmentUpdatedTimestamp > that.segmentUpdatedTimestamp || !countOfFileInSegment + .equals(that.countOfFileInSegment)) { + return true; + } + return false; + } + + @Override public int hashCode() { + int result = segmentUpdatedTimestamp.hashCode(); + result = 31 * result + countOfFileInSegment.hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index a32e17a..1db3138 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -230,13 +230,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { readCommittedScope); // Clean the updated segments from memory if the update happens on segments List<Segment> toBeCleanedSegments = new ArrayList<>(); - for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager - .getUpdateStatusDetails()) { + for (Segment filteredSegment : filteredSegmentToAccess) { boolean refreshNeeded = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable) - .isRefreshNeeded(segmentUpdateDetail.getSegmentName(), updateStatusManager); + .isRefreshNeeded(filteredSegment, + updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo())); if (refreshNeeded) { - toBeCleanedSegments.add(new Segment(segmentUpdateDetail.getSegmentName(), null)); + toBeCleanedSegments.add(filteredSegment); } } // Clean segments if refresh is needed @@ -246,6 +246,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { toBeCleanedSegments.add(segment); } } + + if (toBeCleanedSegments.size() > 0) { DataMapStoreManager.getInstance() .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 86fda21..58ce5fa 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -198,6 +198,40 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } } + // prepare sdk writer output + def buildTestDataWithSameUUID(rows: Int, + persistSchema: Boolean, + options: util.Map[String, String], + sortColumns: List[String]): Any = { + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath) + .isTransactionalTable(false) + .sortBy(sortColumns.toArray) + .uniqueIdentifier( + 123).withBlockSize(2) + .buildWriterForCSVInput() + var i = 0 + while (i < rows) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => throw new RuntimeException(ex) + + case _ => None + } + } def cleanTestData() = { FileUtils.deleteDirectory(new File(writerPath)) @@ -229,6 +263,44 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS sdkOutputTable") } + test( + "Read two sdk writer outputs before and after deleting the existing files and creating new " + + "files with same schema and UUID") { + FileUtils.deleteDirectory(new File(writerPath)) + buildTestDataWithSameUUID(3, false, null, List("name")) + assert(new File(writerPath).exists()) + + sql("DROP TABLE IF EXISTS sdkOutputTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + new File(writerPath).listFiles().map(x => LOGGER.audit(x.getName +" : "+x.lastModified())) + FileUtils.deleteDirectory(new File(writerPath)) + // Thread.sleep is required because it is possible sometime deletion + // and creation of new file can happen at same timestamp. + Thread.sleep(1000) + assert(!new File(writerPath).exists()) + buildTestDataWithSameUUID(4, false, null, List("name")) + new File(writerPath).listFiles().map(x => LOGGER.audit(x.getName +" : "+x.lastModified())) + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot3", 3, 1.5))) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + test("test create external table with sort columns") { buildTestDataWithSortColumns() assert(new File(writerPath).exists()) @@ -638,9 +710,40 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { Row("robot1", 1, 0.5), Row("robot2", 2, 1.0))) + buildTestDataWithSameUUID(3, false, null, List("name")) + + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + + buildTestDataWithSameUUID(3, false, null, List("name")) + + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + //test filter query checkAnswer(sql("select * from sdkOutputTable where age = 1"), Seq( Row("robot1", 1, 0.5), + Row("robot1", 1, 0.5), + Row("robot1", 1, 0.5), Row("robot1", 1, 0.5))) // test the default sort column behavior in Nontransactional table @@ -653,6 +756,39 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + test( + "Read two sdk writer outputs before and after deleting the existing files and creating new " + + "files with same schema") { + buildTestDataSingleFile() + assert(new File(writerPath).exists()) + + sql("DROP TABLE IF EXISTS sdkOutputTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + + FileUtils.deleteDirectory(new File(writerPath)) + buildTestData(4, false, null) + + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot3", 3, 1.5))) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + test("test bad records form sdk writer") { //1. Action = FORCE