This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 44db434  [CARBONDATA-3987] Handled filter and IUD operation for 
pagination reader in SDK
44db434 is described below

commit 44db4341a39e063c6b3d274310dc27b99261ebaf
Author: Nihal ojha <[email protected]>
AuthorDate: Wed Dec 30 14:04:51 2020 +0530

    [CARBONDATA-3987] Handled filter and IUD operation for pagination reader in 
SDK
    
    Why is this PR needed?
    Currently, SDK pagination reader is not supported for the filter expression 
and also returning the wrong result after performing IUD operation through SDK.
    
    What changes were proposed in this PR?
    In case of filter present or update/delete operation get the total rows in 
splits after building the carbon reader else get the row count from the details 
info of each splits.
    Handled ArrayIndexOutOfBoundException and return zero in case of 
rowCountInSplits.size() == 0
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4068
---
 .../core/constants/CarbonCommonConstants.java      |   2 +
 .../org/apache/carbondata/sdk/file/CarbonIUD.java  |  16 +++
 .../carbondata/sdk/file/CarbonReaderBuilder.java   |  65 +++++++++-
 .../sdk/file/PaginationCarbonReader.java           |  27 ++--
 .../sdk/file/PaginationCarbonReaderTest.java       | 138 +++++++++++++++++++++
 5 files changed, 231 insertions(+), 17 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 991b0f6..85d6b7c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2610,4 +2610,6 @@ public final class CarbonCommonConstants {
   @CarbonProperty(dynamicConfigurable = true)
   public static final String CARBON_MAP_ORDER_PUSHDOWN = 
"carbon.mapOrderPushDown";
 
+  public static final String CARBON_SDK_EMPTY_METADATA_PATH = 
"emptyMetadataFolder";
+
 }
diff --git 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
index ed526b9..1a35f14 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
@@ -33,6 +33,8 @@ import java.util.stream.Stream;
 
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.Field;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -44,6 +46,7 @@ import 
org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -121,6 +124,18 @@ public class CarbonIUD {
     for (Map.Entry<String, Map<String, Set<String>>> path : 
this.filterColumnToValueMappingForDelete
         .entrySet()) {
       deleteExecution(path.getKey());
+      createEmptyMetadataFile(path.getKey());
+    }
+  }
+
+  private void createEmptyMetadataFile(String path) throws IOException {
+    if (!StringUtils.isEmpty(path)) {
+      path = path + CarbonCommonConstants.FILE_SEPARATOR +
+          CarbonCommonConstants.CARBON_SDK_EMPTY_METADATA_PATH;
+      CarbonFile emptySDKDirectory = FileFactory.getCarbonFile(path);
+      if (!emptySDKDirectory.exists()) {
+        emptySDKDirectory.mkdirs();
+      }
     }
   }
 
@@ -199,6 +214,7 @@ public class CarbonIUD {
         .entrySet()) {
       if (this.updateColumnToValueMapping.containsKey(path.getKey())) {
         updateExecution(path.getKey());
+        createEmptyMetadataFile(path.getKey());
       }
     }
   }
diff --git 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index b2ceb0c..c746405 100644
--- 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -26,7 +26,9 @@ import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.IndexFilter;
 import org.apache.carbondata.core.index.IndexStoreManager;
@@ -43,6 +45,7 @@ import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
+import 
org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +57,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
 
 @InterfaceAudience.User
 @InterfaceStability.Evolving
@@ -71,6 +75,9 @@ public class CarbonReaderBuilder {
   private List fileLists;
   private Class<? extends CarbonReadSupport> readSupportClass;
 
+  private static final Logger LOGGER =
+      
LogServiceFactory.getLogService(CarbonOutputIteratorWrapper.class.getName());
+
   /**
    * Construct a CarbonReaderBuilder with table path and table name
    *
@@ -414,7 +421,9 @@ public class CarbonReaderBuilder {
         format = prepareFileInputFormat(job, true, false);
         List<InputSplit> splits =
             format.getSplits(new JobContextImpl(job.getConfiguration(), new 
JobID()));
-        return new PaginationCarbonReader(splits, this);
+        List<Long> rowCountInSplit = new ArrayList<>(splits.size());
+        totalRowCountInSplits(job, splits, rowCountInSplit);
+        return new PaginationCarbonReader(splits, this, rowCountInSplit);
       }
     } catch (Exception ex) {
       if (format != null) {
@@ -427,6 +436,60 @@ public class CarbonReaderBuilder {
     }
   }
 
+  private <T> void totalRowCountInSplits(Job job, List<InputSplit> splits,
+      List<Long> rowCountInSplit)
+      throws IOException, InterruptedException {
+    CarbonFileInputFormat format = this.prepareFileInputFormat(job, false, 
true);
+    long sum = 0;
+    boolean isIUDTable = false;
+    if (!StringUtils.isEmpty(this.tablePath)) {
+      // Check if update or delete happened on the table.
+      CarbonFile emptyMetadataFile = FileFactory.getCarbonFile(this.tablePath +
+          CarbonCommonConstants.FILE_SEPARATOR +
+          CarbonCommonConstants.CARBON_SDK_EMPTY_METADATA_PATH, 
this.hadoopConf);
+      if (emptyMetadataFile.exists() && emptyMetadataFile.isDirectory()) {
+        isIUDTable = true;
+      }
+    }
+    // if filter exists or IUD happened then read the total number of rows 
after
+    // building carbon reader else get the row count from the details info of 
each splits.
+    if (this.filterExpression != null || isIUDTable) {
+      RecordReader reader = null;
+      CarbonReader carbonReader = null;
+      for (InputSplit split : splits) {
+        List<RecordReader<Void, T>> readers = new ArrayList<>();
+        try {
+          reader = this.getRecordReader(job, format, readers, split);
+          readers.add(reader);
+          carbonReader = new CarbonReader<>(readers);
+          while (carbonReader.hasNext()) {
+            try {
+              sum += carbonReader.readNextBatchRow().length;
+            } catch (Exception ex) {
+              LOGGER.error("Exception occured while reading the batch row " + 
ex.getMessage());
+            }
+          }
+          rowCountInSplit.add(sum);
+        } finally {
+          if (reader != null) {
+            reader.close();
+          }
+          if (carbonReader != null) {
+            carbonReader.close();
+          }
+        }
+      }
+    } else {
+      for (InputSplit split : splits) {
+        // prepare a summation array of row counts in each blocklet,
+        // this is used for pruning with pagination vales.
+        // At current index, it contains sum of rows of all the blocklet from 
previous + current.
+        sum += ((CarbonInputSplit) split).getDetailInfo().getRowCount();
+        rowCountInSplit.add(sum);
+      }
+    }
+  }
+
   private  <T> CarbonReader<T> buildWithSplits(InputSplit inputSplit)
       throws IOException, InterruptedException {
     if (hadoopConf == null) {
diff --git 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
index 83f7acd..3ad6357 100644
--- 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
+++ 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/PaginationCarbonReader.java
@@ -60,22 +60,15 @@ public class PaginationCarbonReader<T> extends 
CarbonReader<T> {
    * Call {@link #builder(String)} to construct an instance
    */
 
-  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder 
readerBuilder) {
+  PaginationCarbonReader(List<InputSplit> splits, CarbonReaderBuilder 
readerBuilder,
+      List<Long> rowsInSplits) {
     // Initialize super class with no readers.
     // Based on the splits identified for pagination query, readers will be 
built for the query.
     super(null);
     this.allBlockletSplits = splits;
     this.readerBuilder = readerBuilder;
     // prepare the mapping.
-    rowCountInSplits = new ArrayList<>(splits.size());
-    long sum = 0;
-    for (InputSplit split : splits) {
-      // prepare a summation array of row counts in each blocklet,
-      // this is used for pruning with pagination vales.
-      // At current index, it contains sum of rows of all the blocklet from 
previous + current.
-      sum += ((CarbonInputSplit) split).getDetailInfo().getRowCount();
-      rowCountInSplits.add(sum);
-    }
+    rowCountInSplits = rowsInSplits;
   }
 
   /**
@@ -118,6 +111,9 @@ public class PaginationCarbonReader<T> extends 
CarbonReader<T> {
     if (isClosed) {
       throw new RuntimeException("Pagination Reader is closed. please build 
again");
     }
+    if (rowCountInSplits.size() == 0) {
+      return 0;
+    }
     return rowCountInSplits.get(rowCountInSplits.size() - 1);
   }
 
@@ -173,15 +169,13 @@ public class PaginationCarbonReader<T> extends 
CarbonReader<T> {
       } else {
         BlockletDetailInfo detailInfo =
             ((CarbonInputSplit) allBlockletSplits.get(i)).getDetailInfo();
-        int rowCountInBlocklet = detailInfo.getRowCount();
-        Object[] rowsInBlocklet = new Object[rowCountInBlocklet];
+        List<Object> rowsInBlocklet = new ArrayList<>();
         // read the rows from the blocklet
         // TODO: read blocklets in multi-thread if there is a performance 
requirement.
         readerBuilder.setInputSplit(allBlockletSplits.get(i));
         CarbonReader<Object> carbonReader = readerBuilder.build();
-        int count = 0;
         while (carbonReader.hasNext()) {
-          rowsInBlocklet[count++] = carbonReader.readNextRow();
+          rowsInBlocklet.add(carbonReader.readNextRow());
         }
         carbonReader.close();
         long fromRowId;
@@ -191,7 +185,8 @@ public class PaginationCarbonReader<T> extends 
CarbonReader<T> {
           // previous index will contain the sum of rows till previous 
blocklet.
           fromRowId = rowCountInSplits.get(i - 1) + 1;
         }
-        blockletRows = new BlockletRows(fromRowId, detailInfo.getBlockSize(), 
rowsInBlocklet);
+        blockletRows = new BlockletRows(fromRowId, detailInfo.getBlockSize(),
+            rowsInBlocklet.toArray());
         // add entry to cache with no expiry time
         // key: unique blocklet id
         // value: BlockletRows
@@ -200,7 +195,7 @@ public class PaginationCarbonReader<T> extends 
CarbonReader<T> {
       long fromBlockletRow = blockletRows.getRowIdStartIndex();
       long toBlockletRow = fromBlockletRow + blockletRows.getRowsCount();
       Object[] rowsInBlocklet = blockletRows.getRows();
-      if (toRowNumber > toBlockletRow) {
+      if (toRowNumber >= toBlockletRow) {
         if (fromRowNumber >= fromBlockletRow) {
           // only fromRowNumber lies in this blocklet,
           // read from fromRowNumber to end of the blocklet.
diff --git 
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java
 
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java
index 654d770..d0d60bd 100644
--- 
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java
+++ 
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/PaginationCarbonReaderTest.java
@@ -18,11 +18,19 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.commons.io.FileUtils;
@@ -218,4 +226,134 @@ public class PaginationCarbonReaderTest {
     FileUtils.deleteDirectory(new File(path));
   }
 
+  @Test
+  public void testSDKPaginationFilter() throws IOException, 
InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("data", DataTypes.VARCHAR);
+    fields[2] = new Field("id", DataTypes.LONG);
+
+
+    String data = RandomStringUtils.randomAlphabetic(1024);
+    // create more than one blocklet
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          
.outputPath(path).withBlockletSize(1).withBlockSize(2).withTableProperty("local_dictionary_enable",
 "false");
+      CarbonWriter writer = builder.withCsvInput(new 
Schema(fields)).writtenBy("TestUtil").build();
+      for (int i = 1; i <= 100000; i++) {
+        writer.write(new String[]{"robot" + i, data, String.valueOf(i)});
+      }
+      writer.close();
+    } catch (Exception ex) {
+      assert (false);
+    }
+
+    // configure cache size = 4 blocklet
+    CarbonProperties.getInstance()
+        
.addProperty(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, 
"4");
+
+    //filter expression
+    EqualToExpression equalExpression =
+        new EqualToExpression(new ColumnExpression("name", DataTypes.STRING),
+            new LiteralExpression("robot1", DataTypes.STRING));
+
+    CarbonReaderBuilder carbonReaderBuilder = CarbonReader.builder(path, 
"_temp")
+        .withPaginationSupport().projection(new String[]{"name", 
"id"}).filter(equalExpression);
+
+    PaginationCarbonReader<Object> paginationCarbonReader =
+        (PaginationCarbonReader<Object>) carbonReaderBuilder.build();
+    assert (paginationCarbonReader.getTotalRows() == 1);
+    paginationCarbonReader.close();
+
+    // Not Equals expression
+    NotEqualsExpression notEqualsExpression =
+        new NotEqualsExpression(new ColumnExpression("name", DataTypes.STRING),
+            new LiteralExpression("robot1", DataTypes.STRING));
+    paginationCarbonReader =
+        (PaginationCarbonReader<Object>) 
carbonReaderBuilder.filter(notEqualsExpression).build();
+    assert (paginationCarbonReader.getTotalRows() == 99999);
+    paginationCarbonReader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testSDKPaginationInsertData() throws IOException, 
InvalidLoadOptionException, InterruptedException {
+    List<String[]> data1 = new ArrayList<String[]>();
+    String[] row1 = {"1", "AAA", "3", "3444345.66", "true", "1979-12-09", 
"2011-2-10 1:00:20", "Pune", "IT"};
+    String[] row2 = {"2", "BBB", "2", "543124.66", "false", "1987-2-19", 
"2017-1-1 12:00:20", "Bangalore", "DATA"};
+    String[] row3 = {"3", "CCC", "1", "787878.888", "false", "1982-05-12", 
"2015-12-1 2:20:20", "Pune", "DATA"};
+    String[] row4 = {"4", "DDD", "1", "99999.24", "true", "1981-04-09", 
"2000-1-15 7:00:20", "Delhi", "MAINS"};
+    String[] row5 = {"5", "EEE", "3", "545656.99", "true", "1987-12-09", 
"2017-11-25 04:00:20", "Delhi", "IT"};
+
+    data1.add(row1);
+    data1.add(row2);
+    data1.add(row3);
+    data1.add(row4);
+    data1.add(row5);
+
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[9];
+    fields[0] = new Field("id", DataTypes.INT);
+    fields[1] = new Field("name", DataTypes.STRING);
+    fields[2] = new Field("rank", DataTypes.SHORT);
+    fields[3] = new Field("salary", DataTypes.DOUBLE);
+    fields[4] = new Field("active", DataTypes.BOOLEAN);
+    fields[5] = new Field("dob", DataTypes.DATE);
+    fields[6] = new Field("doj", DataTypes.TIMESTAMP);
+    fields[7] = new Field("city", DataTypes.STRING);
+    fields[8] = new Field("dept", DataTypes.STRING);
+
+    CarbonWriterBuilder builder = CarbonWriter.builder()
+        .outputPath(path).withBlockletSize(1).withBlockSize(2);
+    CarbonWriter writer = builder.withCsvInput(new 
Schema(fields)).writtenBy("TestUtil").build();
+    for (int i = 0; i < 5; i++) {
+      writer.write(data1.get(i));
+    }
+    writer.close();
+
+    String[] row = {"222", "Daisy", "3", "334.456", "true", "1956-11-08", 
"2013-12-10 12:00:20", "Pune", "IT"};
+    writer = CarbonWriter.builder()
+        .outputPath(path).withBlockletSize(1).withBlockSize(2)
+        .withCsvInput(new Schema(fields)).writtenBy("TestUtil").build();
+    writer.write(row);
+    writer.close();
+
+    // configure cache size = 4 blocklet
+    CarbonProperties.getInstance()
+        
.addProperty(CarbonCommonConstants.CARBON_MAX_PAGINATION_LRU_CACHE_SIZE_IN_MB, 
"4");
+
+    PaginationCarbonReader<Object> paginationCarbonReader =
+        (PaginationCarbonReader<Object>) CarbonReader.builder(path, "_temp")
+            .withPaginationSupport().projection(new String[]
+                {"id", "name", "rank", "salary", "active", "dob", "doj", 
"city", "dept"}).build();
+    assert (paginationCarbonReader.getTotalRows() == 6);
+    Object[] rows = paginationCarbonReader.read(1, 6);
+    assert (rows.length == 6);
+
+    CarbonIUD.getInstance().delete(path, "name", "AAA").commit();
+
+    CarbonReaderBuilder carbonReaderBuilder = CarbonReader.builder(path, 
"_temp")
+        .withPaginationSupport().projection(new String[]{"id", "name", "rank", 
"salary", "active", "dob", "doj", "city", "dept"});
+    paginationCarbonReader = (PaginationCarbonReader<Object>) 
carbonReaderBuilder.build();
+
+    assert (paginationCarbonReader.getTotalRows() == 5);
+    rows = paginationCarbonReader.read(1, 5);
+    assert (rows.length == 5);
+    paginationCarbonReader.close();
+
+    CarbonIUD.getInstance().update(path, "name", "AAA", "name", 
"nihal").commit();
+    paginationCarbonReader =
+        (PaginationCarbonReader<Object>) CarbonReader.builder(path, "_temp")
+            .withPaginationSupport().projection(new String[]
+                {"id", "name", "rank", "salary", "active", "dob", "doj", 
"city", "dept"}).build();
+    assert (paginationCarbonReader.getTotalRows() == 5);
+    rows = paginationCarbonReader.read(1, 5);
+    assert (rows.length == 5);
+    paginationCarbonReader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
 }

Reply via email to