[CARBONDATA-2431] Incremental data added after external table creation is not 
reflecting while doing select query issue is fixed.

This closes #2262


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f1a6c7cf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f1a6c7cf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f1a6c7cf

Branch: refs/heads/spark-2.3
Commit: f1a6c7cf548cd33ef26bd99f26c7fcf7e367c9c7
Parents: 2881c6b
Author: rahulforallp <rahul.ku...@knoldus.in>
Authored: Thu May 3 14:11:12 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Mon May 14 11:27:33 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       |  19 +--
 .../apache/carbondata/core/datamap/Segment.java |   7 +
 .../LatestFilesReadCommittedScope.java          |  32 ++++-
 .../ReadCommittedIndexFileSnapShot.java         |  10 +-
 .../core/readcommitter/ReadCommittedScope.java  |   5 +
 .../TableStatusReadCommittedScope.java          |  13 ++
 .../core/statusmanager/SegmentRefreshInfo.java  |  65 +++++++++
 .../hadoop/api/CarbonTableInputFormat.java      |  10 +-
 .../TestNonTransactionalCarbonTable.scala       | 136 +++++++++++++++++++
 9 files changed, 282 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a3be26a..072b86e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -43,6 +43,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorage
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
@@ -454,7 +455,7 @@ public final class DataMapStoreManager {
 
     // This map stores the latest segment refresh time.So in case of 
update/delete we check the
     // time against this map.
-    private Map<String, Long> segmentRefreshTime = new HashMap<>();
+    private Map<String, SegmentRefreshInfo> segmentRefreshTime = new 
HashMap<>();
 
     // This map keeps the manual refresh entries from users. It is mainly used 
for partition
     // altering.
@@ -465,23 +466,25 @@ public final class DataMapStoreManager {
       SegmentUpdateDetails[] updateStatusDetails = 
statusManager.getUpdateStatusDetails();
       for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
         UpdateVO updateVO = 
statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
-        segmentRefreshTime.put(updateVO.getSegmentId(), 
updateVO.getCreatedOrUpdatedTimeStamp());
+        segmentRefreshTime.put(updateVO.getSegmentId(),
+            new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 
0));
       }
     }
 
-    public boolean isRefreshNeeded(String segmentId, 
SegmentUpdateStatusManager statusManager) {
-      UpdateVO updateVO = statusManager.getInvalidTimestampRange(segmentId);
+    public boolean isRefreshNeeded(Segment seg, UpdateVO updateVo) throws 
IOException {
+      SegmentRefreshInfo segmentRefreshInfo =
+          seg.getSegmentRefreshInfo(updateVo);
+      String segmentId = seg.getSegmentNo();
       if (segmentRefreshTime.get(segmentId) == null) {
-        segmentRefreshTime.put(segmentId, 
updateVO.getCreatedOrUpdatedTimeStamp());
+        segmentRefreshTime.put(segmentId, segmentRefreshInfo);
         return true;
       }
       if (manualSegmentRefresh.get(segmentId) != null && 
manualSegmentRefresh.get(segmentId)) {
         manualSegmentRefresh.put(segmentId, false);
         return true;
       }
-      Long updateTimestamp = updateVO.getLatestUpdateTimestamp();
-      boolean isRefresh =
-          updateTimestamp != null && (updateTimestamp > 
segmentRefreshTime.get(segmentId));
+
+      boolean isRefresh = 
segmentRefreshInfo.compare(segmentRefreshTime.get(segmentId));
       if (isRefresh) {
         segmentRefreshTime.remove(segmentId);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 476f9da..85c7176 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -25,8 +25,10 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -111,6 +113,11 @@ public class Segment implements Serializable {
     return readCommittedScope.getCommittedIndexFile(this);
   }
 
+  public SegmentRefreshInfo getSegmentRefreshInfo(UpdateVO updateVo)
+      throws IOException {
+    return readCommittedScope.getCommitedSegmentRefreshInfo(this, updateVo);
+  }
+
   public String getSegmentNo() {
     return segmentNo;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index de7f8a9..2306330 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -28,7 +28,9 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -43,7 +45,7 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
   private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
   private LoadMetadataDetails[] loadMetadataDetails;
 
-  public LatestFilesReadCommittedScope(String path) {
+  public LatestFilesReadCommittedScope(String path)  {
     this.carbonFilePath = path;
     try {
       takeCarbonIndexFileSnapShot();
@@ -104,6 +106,20 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
     return indexFileStore;
   }
 
+  @Override public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+      Segment segment, UpdateVO updateVo) throws IOException {
+    Map<String, SegmentRefreshInfo> snapShot =
+        readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap();
+    String segName;
+    if (segment.getSegmentNo() != null) {
+      segName = segment.getSegmentNo();
+    } else {
+      segName = segment.getSegmentFileName();
+    }
+    SegmentRefreshInfo segmentRefreshInfo = snapShot.get(segName);
+    return segmentRefreshInfo;
+  }
+
   private String getSegmentID(String carbonIndexFileName, String 
indexFilePath) {
     if (indexFilePath.contains("/Fact/Part0/Segment_")) {
       // This is CarbonFile case where the Index files are present inside the 
Segment Folder
@@ -128,6 +144,7 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
       throw new IOException("No files are present in the table location :" + 
carbonFilePath);
     }
     Map<String, List<String>> indexFileStore = new HashMap<>();
+    Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new 
HashMap<>();
     if (file.isDirectory()) {
       CarbonFile[] carbonIndexFiles = 
SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
       for (int i = 0; i < carbonIndexFiles.length; i++) {
@@ -139,18 +156,29 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
               getSegmentID(carbonIndexFiles[i].getName(), 
carbonIndexFiles[i].getAbsolutePath());
           // TODO. During Partition table handling, place Segment File Name.
           List<String> indexList;
+          SegmentRefreshInfo segmentRefreshInfo;
           if (indexFileStore.get(segId) == null) {
             indexList = new ArrayList<>(1);
+            segmentRefreshInfo =
+                new 
SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
+            segmentTimestampUpdaterMap.put(segId, segmentRefreshInfo);
           } else {
             // Entry is already present.
             indexList = indexFileStore.get(segId);
+            segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId);
           }
           indexList.add(carbonIndexFiles[i].getAbsolutePath());
+          if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < 
carbonIndexFiles[i]
+              .getLastModifiedTime()) {
+            segmentRefreshInfo
+                
.setSegmentUpdatedTimestamp(carbonIndexFiles[i].getLastModifiedTime());
+          }
           indexFileStore.put(segId, indexList);
+          segmentRefreshInfo.setCountOfFileInSegment(indexList.size());
         }
       }
       ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
-          new ReadCommittedIndexFileSnapShot(indexFileStore);
+          new ReadCommittedIndexFileSnapShot(indexFileStore, 
segmentTimestampUpdaterMap);
       this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
       prepareLoadMetadata();
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
index 3e8e04f..70ca6ec 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 
 /**
  * This class is going to save the the Index files which are taken snapshot
@@ -36,12 +37,19 @@ public class ReadCommittedIndexFileSnapShot implements 
Serializable {
    * Segment Numbers are mapped with list of Index Files.
    */
   private Map<String, List<String>> segmentIndexFileMap;
+  private Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap;
 
-  public ReadCommittedIndexFileSnapShot(Map<String, List<String>> 
segmentIndexFileMap) {
+  public ReadCommittedIndexFileSnapShot(Map<String, List<String>> 
segmentIndexFileMap,
+      Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap) {
     this.segmentIndexFileMap = segmentIndexFileMap;
+    this.segmentTimestampUpdaterMap = segmentTimestampUpdaterMap;
   }
 
   public Map<String, List<String>> getSegmentIndexFileMap() {
     return segmentIndexFileMap;
   }
+
+  public Map<String, SegmentRefreshInfo> getSegmentTimestampUpdaterMap() {
+    return segmentTimestampUpdaterMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index 9ae462b..6ff4b89 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -23,7 +23,9 @@ import java.util.Map;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 
 /**
  * ReadCommitted interface that defines a read scope.
@@ -43,5 +45,8 @@ public interface ReadCommittedScope extends Serializable {
    */
   public Map<String, String> getCommittedIndexFile(Segment segment) throws 
IOException ;
 
+  public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+      Segment segment, UpdateVO updateVo) throws IOException;
+
   public void takeCarbonIndexFileSnapShot() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 41ce31c..91ebd41 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -25,7 +25,9 @@ import org.apache.carbondata.core.datamap.Segment;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -77,6 +79,17 @@ public class TableStatusReadCommittedScope implements 
ReadCommittedScope {
     return indexFiles;
   }
 
+  public SegmentRefreshInfo getCommitedSegmentRefreshInfo(Segment segment, 
UpdateVO updateVo)
+      throws IOException {
+    SegmentRefreshInfo segmentRefreshInfo;
+    if (updateVo != null) {
+      segmentRefreshInfo = new 
SegmentRefreshInfo(updateVo.getCreatedOrUpdatedTimeStamp(), 0);
+    } else {
+      segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+    }
+    return segmentRefreshInfo;
+  }
+
   @Override public void takeCarbonIndexFileSnapShot() throws IOException {
     // Only Segment Information is updated.
     // File information will be fetched on the fly according to the fecthed 
segment info.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
new file mode 100644
index 0000000..11fb73f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import java.io.Serializable;
+
+public class SegmentRefreshInfo implements Serializable {
+
+  private Long segmentUpdatedTimestamp;
+  private Integer countOfFileInSegment;
+
+  public SegmentRefreshInfo(Long segmentUpdatedTimestamp, Integer 
countOfFileInSegment) {
+    this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+    this.countOfFileInSegment = countOfFileInSegment;
+  }
+
+  public Long getSegmentUpdatedTimestamp() {
+    return segmentUpdatedTimestamp;
+  }
+
+  public void setSegmentUpdatedTimestamp(Long segmentUpdatedTimestamp) {
+    this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+  }
+
+  public Integer getCountOfFileInSegment() {
+    return countOfFileInSegment;
+  }
+
+  public void setCountOfFileInSegment(Integer countOfFileInSegment) {
+    this.countOfFileInSegment = countOfFileInSegment;
+  }
+
+  public boolean compare(Object o) {
+    if (!(o instanceof SegmentRefreshInfo)) return false;
+
+    SegmentRefreshInfo that = (SegmentRefreshInfo) o;
+
+    if (segmentUpdatedTimestamp > that.segmentUpdatedTimestamp || 
!countOfFileInSegment
+        .equals(that.countOfFileInSegment)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override public int hashCode() {
+    int result = segmentUpdatedTimestamp.hashCode();
+    result = 31 * result + countOfFileInSegment.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index a32e17a..1db3138 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -230,13 +230,13 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
             readCommittedScope);
     // Clean the updated segments from memory if the update happens on segments
     List<Segment> toBeCleanedSegments = new ArrayList<>();
-    for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
-        .getUpdateStatusDetails()) {
+    for (Segment filteredSegment : filteredSegmentToAccess) {
       boolean refreshNeeded =
           
DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
-              .isRefreshNeeded(segmentUpdateDetail.getSegmentName(), 
updateStatusManager);
+              .isRefreshNeeded(filteredSegment,
+                  
updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
       if (refreshNeeded) {
-        toBeCleanedSegments.add(new 
Segment(segmentUpdateDetail.getSegmentName(), null));
+        toBeCleanedSegments.add(filteredSegment);
       }
     }
     // Clean segments if refresh is needed
@@ -246,6 +246,8 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
         toBeCleanedSegments.add(segment);
       }
     }
+
+
     if (toBeCleanedSegments.size() > 0) {
       DataMapStoreManager.getInstance()
           .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1a6c7cf/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 86fda21..58ce5fa 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -198,6 +198,40 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     }
   }
 
+  // prepare sdk writer output
+  def buildTestDataWithSameUUID(rows: Int,
+      persistSchema: Boolean,
+      options: util.Map[String, String],
+      sortColumns: List[String]): Any = {
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
+          .isTransactionalTable(false)
+          .sortBy(sortColumns.toArray)
+          .uniqueIdentifier(
+            123).withBlockSize(2)
+          .buildWriterForCSVInput()
+      var i = 0
+      while (i < rows) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+
+      case _ => None
+    }
+  }
 
   def cleanTestData() = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -229,6 +263,44 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS sdkOutputTable")
   }
 
+  test(
+    "Read two sdk writer outputs before and after deleting the existing files 
and creating new " +
+    "files with same schema and UUID") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestDataWithSameUUID(3, false, null, List("name"))
+    assert(new File(writerPath).exists())
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+    new File(writerPath).listFiles().map(x => LOGGER.audit(x.getName +" : 
"+x.lastModified()))
+    FileUtils.deleteDirectory(new File(writerPath))
+    // Thread.sleep is required because it is possible sometime deletion
+    // and creation of new file can happen at same timestamp.
+    Thread.sleep(1000)
+    assert(!new File(writerPath).exists())
+    buildTestDataWithSameUUID(4, false, null, List("name"))
+    new File(writerPath).listFiles().map(x => LOGGER.audit(x.getName +" : 
"+x.lastModified()))
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot3", 3, 1.5)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
   test("test create external table with sort columns") {
     buildTestDataWithSortColumns()
     assert(new File(writerPath).exists())
@@ -638,9 +710,40 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       Row("robot1", 1, 0.5),
       Row("robot2", 2, 1.0)))
 
+    buildTestDataWithSameUUID(3, false, null, List("name"))
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    buildTestDataWithSameUUID(3, false, null, List("name"))
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
     //test filter query
     checkAnswer(sql("select * from sdkOutputTable where age = 1"), Seq(
       Row("robot1", 1, 0.5),
+      Row("robot1", 1, 0.5),
+      Row("robot1", 1, 0.5),
       Row("robot1", 1, 0.5)))
 
     // test the default sort column behavior in Nontransactional table
@@ -653,6 +756,39 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     cleanTestData()
   }
 
+  test(
+    "Read two sdk writer outputs before and after deleting the existing files 
and creating new " +
+    "files with same schema") {
+    buildTestDataSingleFile()
+    assert(new File(writerPath).exists())
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestData(4, false, null)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot3", 3, 1.5)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
   test("test bad records form sdk writer") {
 
     //1. Action = FORCE

Reply via email to