This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f1d98f  [CARBONDATA-3246]Fix sdk reader issue if batch size is given 
as zero and vectorRead False.
0f1d98f is described below

commit 0f1d98f022019e408c93b86c847c2ae9f8d030fc
Author: shardul-cr7 <shardulsing...@gmail.com>
AuthorDate: Wed Jan 23 11:31:04 2019 +0530

    [CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero and 
vectorRead False.
    
    Problem: SDK reader is failing if vectorRead is false and detail query 
batch size is given as 0.
    Compiler is giving stack overflow error after getting stuck in 
ChunkRowIterator.hasnext recurssion.
    
    Solution: Since 0 is wrong batch size, we should take 
DETAIL_QUERY_BATCH_SIZE_DEFAULT as the batch size
    
    This closes #3097
---
 .../core/constants/CarbonCommonConstants.java      | 10 ++++++
 .../AbstractDetailQueryResultIterator.java         |  3 --
 .../carbondata/core/util/CarbonProperties.java     | 39 ++++++++++++++++++++-
 docs/configuration-parameters.md                   |  2 +-
 .../carbondata/sdk/file/CarbonReaderTest.java      | 23 +++++++++++++
 .../sdk/file/ConcurrentSdkReaderTest.java          | 40 ----------------------
 6 files changed, 72 insertions(+), 45 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ccc8b99..b7d9761 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1228,6 +1228,16 @@ public final class CarbonCommonConstants {
   public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 100;
 
   /**
+   * Maximum batch size of carbon.detail.batch.size property
+   */
+  public static final int DETAIL_QUERY_BATCH_SIZE_MAX = 1000;
+
+  /**
+   * Minimum batch size of carbon.detail.batch.size property
+   */
+  public static final int DETAIL_QUERY_BATCH_SIZE_MIN = 100;
+
+  /**
    * max driver lru cache size upto which lru cache will be loaded in memory
    */
   @CarbonProperty
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 30f5183..9282d44 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -94,9 +94,6 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
     if (null != batchSizeString) {
       try {
         batchSize = Integer.parseInt(batchSizeString);
-        if (0 == batchSize) {
-          batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
-        }
       } catch (NumberFormatException ne) {
         LOGGER.error("Invalid inmemory records size. Using default value");
         batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index f9131f5..49388b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -56,6 +56,10 @@ import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE;
+import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE;
+import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MAX;
+import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MIN;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
@@ -192,6 +196,9 @@ public final class CarbonProperties {
       case CARBON_MINMAX_ALLOWED_BYTE_COUNT:
         validateStringCharacterLimit();
         break;
+      case DETAIL_QUERY_BATCH_SIZE:
+        validateDetailQueryBatchSize();
+        break;
       // TODO : Validation for carbon.lock.type should be handled for 
addProperty flow
       default:
         // none
@@ -256,6 +263,7 @@ public final class CarbonProperties {
     validateEnableQueryStatistics();
     validateSortMemorySpillPercentage();
     validateStringCharacterLimit();
+    validateDetailQueryBatchSize();
   }
 
   /**
@@ -1547,5 +1555,34 @@ public final class CarbonProperties {
     }
   }
 
-
+  /**
+   * This method validates the DETAIL_QUERY_BATCH_SIZE. If some invalid input 
is set, we use the
+   * default value for this property
+   */
+  private void validateDetailQueryBatchSize() {
+    String batchSizeString =
+        carbonProperties.getProperty(DETAIL_QUERY_BATCH_SIZE);
+    if (batchSizeString == null) {
+      carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+          Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+      LOGGER.info(
+          "Using default value for carbon.detail.batch.size " + 
DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+    } else {
+      int batchSize;
+      try {
+        batchSize = Integer.parseInt(batchSizeString);
+        if (batchSize < DETAIL_QUERY_BATCH_SIZE_MIN || batchSize > 
DETAIL_QUERY_BATCH_SIZE_MAX) {
+          LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+              + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+          carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+              Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+        }
+      } catch (NumberFormatException ne) {
+        LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+            + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+        carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+            Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+      }
+    }
+  }
 }
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index c7d8152..d28ad61 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -129,7 +129,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.search.master.port | 10020 | Port on which the search master listens 
for incoming query requests |
 | carbon.search.worker.port | 10021 | Port on which search master communicates 
with the workers. |
 | carbon.search.worker.workload.limit | 10 * *carbon.search.scan.thread* | 
Maximum number of active requests that can be sent to a worker. Beyond which 
the request needs to be rescheduled for later time or to a different worker. |
-| carbon.detail.batch.size | 100 | The buffer size to store records, returned 
from the block scan. In limit scenario this parameter is very important. For 
example your query limit is 1000. But if we set this value to 3000 that means 
we get 3000 records from scan but spark will only take 1000 rows. So the 2000 
remaining are useless. In one Finance test case after we set it to 100, in the 
limit 1000 scenario the performance increase about 2 times in comparison to if 
we set this value to 12000. |
+| carbon.detail.batch.size | 100 | The buffer size to store records, returned 
from the block scan. In limit scenario this parameter is very important. For 
example your query limit is 1000. But if we set this value to 3000 that means 
we get 3000 records from scan but spark will only take 1000 rows. So the 2000 
remaining are useless. In one Finance test case after we set it to 100, in the 
limit 1000 scenario the performance increase about 2 times in comparison to if 
we set this value to 12 [...]
 | carbon.enable.vector.reader | true | Spark added vector processing to 
optimize cpu cache miss and there by increase the query performance. This 
configuration enables to fetch data as columnar batch of size 4*1024 rows 
instead of fetching data row by row and provide it to spark so that there is 
improvement in  select queries performance. |
 | carbon.task.distribution | block | CarbonData has its own scheduling 
algorithm to suggest to Spark on how many tasks needs to be launched and how 
much work each task need to do in a Spark cluster for any query on CarbonData. 
Each of these task distribution suggestions has its own advantages and 
disadvantages. Based on the customer use case, appropriate task distribution 
can be configured.**block**: Setting this value will launch one task per block. 
This setting is suggested in case of  [...]
 | carbon.custom.block.distribution | false | CarbonData has its own scheduling 
algorithm to suggest to Spark on how many tasks needs to be launched and how 
much work each task need to do in a Spark cluster for any query on CarbonData. 
When this configuration is true, CarbonData would distribute the available 
blocks to be scanned among the available number of cores. For Example:If there 
are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores 
available in the cluster) [...]
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index acd9e5a..28944da 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -104,6 +104,29 @@ public class CarbonReaderTest extends TestCase {
     FileUtils.deleteDirectory(new File(path));
   }
 
+  @Test public void testReadWithZeroBatchSize() throws IOException, 
InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    
DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(path));
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    CarbonReader reader;
+    reader = 
CarbonReader.builder(path).withRowRecordReader().withBatch(0).build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      Assert.assertEquals(("robot" + (i % 10)), row[0]);
+      Assert.assertEquals(i, row[1]);
+      i++;
+    }
+    Assert.assertEquals(i, 10);
+    FileUtils.deleteDirectory(new File(path));
+  }
+
   @Test
   public void testReadWithFilterOfNonTransactionalSimple() throws IOException, 
InterruptedException {
     String path = "./testWriteFiles";
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
index 31342b9..c75b70f 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
@@ -131,46 +131,6 @@ public class ConcurrentSdkReaderTest {
       executorService.awaitTermination(10, TimeUnit.MINUTES);
     }
   }
-
-  @Test public void testReadWithZeroBatchSize() throws InterruptedException {
-    int numFiles = 5;
-    int numRowsPerFile = 5;
-    short numThreads = 4;
-    writeDataMultipleFiles(numFiles, numRowsPerFile);
-
-    // Concurrent Reading
-    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
-    try {
-      long count;
-      CarbonReader reader =
-          
CarbonReader.builder(dataDir).withRowRecordReader().withBatch(0).build();
-      List<CarbonReader> multipleReaders = reader.split(numThreads);
-      try {
-        List<ReadLogic> tasks = new ArrayList<>();
-        List<Future<Long>> results;
-        count = 0;
-
-        for (CarbonReader reader_i : multipleReaders) {
-          tasks.add(new ReadLogic(reader_i));
-        }
-        results = executorService.invokeAll(tasks);
-        for (Future result_i : results) {
-          count += (long) result_i.get();
-        }
-        Assert.assertEquals(numFiles * numRowsPerFile, count);
-      } catch (Exception e) {
-        e.printStackTrace();
-        Assert.fail(e.getMessage());
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } finally {
-      executorService.shutdown();
-      executorService.awaitTermination(10, TimeUnit.MINUTES);
-    }
-  }
-
   class ReadLogic implements Callable<Long> {
     CarbonReader reader;
 

Reply via email to