[ 
https://issues.apache.org/jira/browse/CARBONDATA-279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554261#comment-15554261
 ] 

ASF GitHub Bot commented on CARBONDATA-279:
-------------------------------------------

Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82334120
  
    --- Diff: 
processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
 ---
    @@ -343,41 +349,57 @@ public boolean processRow(StepMetaInterface smi, 
StepDataInterface sdi) throws K
         } catch (NumberFormatException exc) {
           numberOfNodes = NUM_CORES_DEFAULT_VAL;
         }
    +    if ( rddIteratorKey == null ) {
    +      BlockDetails[] blocksInfo = 
GraphGenerator.blockInfo.get(meta.getBlocksID());
    +      if (blocksInfo.length == 0) {
    +        //if isDirectLoad = true, and partition number > file num
    +        //then blocksInfo will get empty in some partition processing, so 
just return
    +        setOutputDone();
    +        return false;
    +      }
     
    -    BlockDetails[] blocksInfo = 
GraphGenerator.blockInfo.get(meta.getBlocksID());
    -    if (blocksInfo.length == 0) {
    -      //if isDirectLoad = true, and partition number > file num
    -      //then blocksInfo will get empty in some partition processing, so 
just return
    -      setOutputDone();
    -      return false;
    -    }
    -
    -    if (numberOfNodes > blocksInfo.length) {
    -      numberOfNodes = blocksInfo.length;
    -    }
    +      if (numberOfNodes > blocksInfo.length) {
    +        numberOfNodes = blocksInfo.length;
    +      }
     
    -    //new the empty lists
    -    for (int pos = 0; pos < numberOfNodes; pos++) {
    -      threadBlockList.add(new ArrayList<BlockDetails>());
    -    }
    +      //new the empty lists
    +      for (int pos = 0; pos < numberOfNodes; pos++) {
    +        threadBlockList.add(new ArrayList<BlockDetails>());
    +      }
     
    -    //block balance to every thread
    -    for (int pos = 0; pos < blocksInfo.length; ) {
    -      for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
    -        if (pos < blocksInfo.length) {
    -          threadBlockList.get(threadNum).add(blocksInfo[pos++]);
    +      //block balance to every thread
    +      for (int pos = 0; pos < blocksInfo.length; ) {
    +        for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
    +          if (pos < blocksInfo.length) {
    +            threadBlockList.get(threadNum).add(blocksInfo[pos++]);
    +          }
             }
           }
    +      LOGGER.info("*****************Started all csv reading***********");
    +      startProcess(numberOfNodes);
    +      LOGGER.info("*****************Completed all csv reading***********");
    +      
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
    +              meta.getPartitionID(), System.currentTimeMillis());
    +    } else {
    +      scanRddIterator();
         }
    -    LOGGER.info("*****************Started all csv reading***********");
    -    startProcess(numberOfNodes);
    -    LOGGER.info("*****************Completed all csv reading***********");
    -    
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
    -        meta.getPartitionID(), System.currentTimeMillis());
         setOutputDone();
         return false;
       }
     
    +  private void scanRddIterator() throws RuntimeException {
    +    Iterator<String[]> iterator = 
RddInputUtils.getAndRemove(rddIteratorKey);
    +    if (iterator != null) {
    +      try{
    +        while(iterator.hasNext()){
    +          putRow(data.outputRowMeta, iterator.next());
    --- End diff --
    
    I agree, we can merge kettle steps.


> [DataLoading]Save a DataFrame to CarbonData file without writing CSV file
> -------------------------------------------------------------------------
>
>                 Key: CARBONDATA-279
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-279
>             Project: CarbonData
>          Issue Type: Improvement
>    Affects Versions: 0.1.0-incubating
>            Reporter: QiangCai
>            Assignee: QiangCai
>            Priority: Minor
>             Fix For: 0.2.0-incubating
>
>
> Directly save a DataFrame to CarbonData file without writing CSV file



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to