This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new afbf531  [CARBONDATA-4111] Filter query having invalid results after 
add segment to table having SI with Indexserver
afbf531 is described below

commit afbf531b6b1237424d1f8408258a4202568824ac
Author: ShreelekhyaG <[email protected]>
AuthorDate: Mon Jan 25 16:09:32 2021 +0530

    [CARBONDATA-4111] Filter query having invalid results after add segment to 
table having SI with Indexserver
    
    Why is this PR needed?
    When the index server is enabled, filter query on SI column after alter 
table
    add sdk segment to maintable throws NoSuchMethodException and the rows added
    by sdk segment are not returned in the result.
    
    What changes were proposed in this PR?
    Added segment path in index server flow, as it is used to identify external 
segment
    in filter resolver step. No need to load to SI, if it is an add load 
command.
    Default constructor for SegmentWrapperContainer declared.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4080
---
 .../java/org/apache/carbondata/core/index/Segment.java  | 12 +++++++++++-
 .../carbondata/core/indexstore/ExtendedBlocklet.java    | 15 ++++++++++++++-
 .../core/indexstore/SegmentWrapperContainer.java        |  3 +++
 .../testsuite/secondaryindex/TestSIWithAddSegment.scala | 17 ++++++++++++-----
 .../secondaryindex/load/CarbonInternalLoaderUtil.java   |  3 ++-
 .../events/SILoadEventListenerForFailedSegments.scala   |  5 +++++
 6 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/index/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
index 202a7d4..6129373 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
@@ -79,7 +79,7 @@ public class Segment implements Serializable, Writable {
   /**
    * Path of segment where it exists
    */
-  private transient String segmentPath;
+  private String segmentPath;
 
   /**
    * Properties of the segment.
@@ -162,6 +162,7 @@ public class Segment implements Serializable, Writable {
     this.segmentFileName = segmentFileName;
     this.readCommittedScope = readCommittedScope;
     this.loadMetadataDetails = loadMetadataDetails;
+    this.segmentPath = loadMetadataDetails.getPath();
     if (loadMetadataDetails.getIndexSize() != null) {
       this.indexSize = Long.parseLong(loadMetadataDetails.getIndexSize());
     }
@@ -377,6 +378,12 @@ public class Segment implements Serializable, Writable {
       out.writeUTF(segmentString);
     }
     out.writeLong(indexSize);
+    if (segmentPath == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeUTF(segmentPath);
+    }
   }
 
   @Override
@@ -394,6 +401,9 @@ public class Segment implements Serializable, Writable {
       this.segmentString = in.readUTF();
     }
     this.indexSize = in.readLong();
+    if (in.readBoolean()) {
+      this.segmentPath = in.readUTF();
+    }
   }
 
   public SegmentMetaDataInfo getSegmentMetaDataInfo() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index e0b21d5..0d2ba79 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.Segment;
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexRowIndexes;
 import org.apache.carbondata.core.indexstore.row.IndexRow;
@@ -221,7 +223,18 @@ public class ExtendedBlocklet extends Blocklet {
       indexUniqueId = in.readUTF();
     }
     String filePath = getPath();
-    if (filePath.startsWith(File.separator)) {
+    boolean isLocalFile = FileFactory.getCarbonFile(filePath) instanceof 
LocalCarbonFile;
+
+    // For external segment, table path need not be appended to filePath as 
contains has full path
+    // Example filepath for ext segment:
+    //  1. /home/user/carbondata/integration/spark/newsegmentpath
+    //  2. hdfs://hacluster/opt/newsegmentpath/
+    // Example filepath for loaded segment: /Fact/Part/Segment0
+    // To identify a filePath as ext segment path,
+    // for other storage systems (hdfs,s3): filePath doesn't start with File 
separator.
+    // for ubuntu storage: it starts with File separator, so check if given 
path exists or not.
+    if ((!isLocalFile && filePath.startsWith(File.separator)) || (isLocalFile 
&& !FileFactory
+        .isFileExist(filePath))) {
       setFilePath(tablePath + filePath);
     } else {
       setFilePath(filePath);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentWrapperContainer.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentWrapperContainer.java
index 0f78ff5..7378bb7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentWrapperContainer.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentWrapperContainer.java
@@ -31,6 +31,9 @@ public class SegmentWrapperContainer implements Writable {
 
   private SegmentWrapper[] segmentWrappers;
 
+  public SegmentWrapperContainer() {
+  }
+
   public SegmentWrapperContainer(SegmentWrapper[] segmentWrappers) {
     this.segmentWrappers = segmentWrappers;
   }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
index e2bc7a1..38ecc5f 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -61,13 +61,18 @@ class TestSIWithAddSegment extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test if the query hits SI after adding a segment to the main table") {
-    val d = sql("select * from maintable where c = 'm'")
-    
assert(d.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
+    val extSegmentQuery = sql("select * from maintable where c = 'm'")
+    val loadedSegmentQuery = sql("select * from maintable where c = 'k'")
+    checkAnswer(extSegmentQuery, Row("m", 3, "m"))
+    checkAnswer(loadedSegmentQuery, Row("k", 1, "k"))
+    
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
+    
assert(loadedSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
   }
 
   test("compare results of SI and NI after adding segments") {
     val siResult = sql("select * from maintable where c = 'm'")
     val niResult = sql("select * from maintable where ni(c = 'm')")
+    
assert(siResult.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
     
assert(!niResult.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
     checkAnswer(siResult, niResult)
   }
@@ -86,10 +91,11 @@ class TestSIWithAddSegment extends QueryTest with 
BeforeAndAfterAll {
     sql(s"alter table maintable1 add segment options('path'='${ newSegmentPath 
}', " +
         s"'format'='carbon')")
     sql("CREATE INDEX maintable1_si  on table maintable1 (c) as 'carbondata'")
-    assert(sql("show segments for table maintable1_si").collect().length ==
-           sql("show segments for table maintable1").collect().length)
+    assert(sql("show segments for table maintable1_si").collect().length == 2)
+    assert(sql("show segments for table maintable1").collect().length == 3)
     val siResult = sql("select * from maintable1 where c = 'm'")
     val niResult = sql("select * from maintable1 where ni(c = 'm')")
+    
assert(siResult.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
     
assert(!niResult.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
     checkAnswer(siResult, niResult)
   }
@@ -110,6 +116,7 @@ class TestSIWithAddSegment extends QueryTest with 
BeforeAndAfterAll {
         s"'format'='carbon')")
     val siResult = sql("select * from maintable1 where c = 'm'")
     val niResult = sql("select * from maintable1 where ni(c = 'm')")
+    
assert(siResult.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
     checkAnswer(siResult, niResult)
   }
 }
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index f0dc758..dd8ea6e 100644
--- 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -51,7 +51,8 @@ public class CarbonInternalLoaderUtil {
   public static List<String> getListOfValidSlices(LoadMetadataDetails[] 
details) {
     List<String> activeSlices = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (LoadMetadataDetails oneLoad : details) {
-      if (SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
+      // External added segments are not loaded to SI
+      if (oneLoad.getPath() == null && 
SegmentStatus.SUCCESS.equals(oneLoad.getSegmentStatus())
           || 
SegmentStatus.LOAD_PARTIAL_SUCCESS.equals(oneLoad.getSegmentStatus())
           || 
SegmentStatus.MARKED_FOR_UPDATE.equals(oneLoad.getSegmentStatus())) {
         activeSlices.add(oneLoad.getLoadName());
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index a7677ea..b371947 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -48,6 +48,11 @@ class SILoadEventListenerForFailedSegments extends 
OperationEventListener with L
           val loadTablePostStatusUpdateEvent = 
event.asInstanceOf[LoadTablePostStatusUpdateEvent]
           val carbonLoadModel = 
loadTablePostStatusUpdateEvent.getCarbonLoadModel
           val sparkSession = SparkSession.getActiveSession.get
+          // Avoid loading segment to SI for add load command
+          if (operationContext.getProperty("isAddLoad") != null &&
+            operationContext.getProperty("isAddLoad").toString.toBoolean) {
+            return
+          }
           if 
(CarbonProperties.getInstance().isSIRepairEnabled(carbonLoadModel.getDatabaseName,
             carbonLoadModel.getTableName)) {
           // when Si creation and load to main table are parallel, get the 
carbonTable from the

Reply via email to