Repository: carbondata
Updated Branches:
  refs/heads/master 51db049c4 -> 74ea24d14


[CARBONDATA-2443][SDK]Multi level complex type support for AVRO based SDK

Multi level complex type support for AVRO based SDK

This closes #2276


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ec33c112
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ec33c112
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ec33c112

Branch: refs/heads/master
Commit: ec33c11286ebe8009ac07698bf23ffb3bd3e7711
Parents: 51db049
Author: sounakr <soun...@gmail.com>
Authored: Mon May 7 06:51:54 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue May 8 16:49:08 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |  17 +-
 .../TestNonTransactionalCarbonTable.scala       |  10 +-
 .../datasources/SparkCarbonTableFormat.scala    |   2 +-
 .../loading/DataLoadProcessBuilder.java         |  12 +-
 .../converter/impl/FieldEncoderFactory.java     |   2 +-
 .../loading/model/CarbonLoadModel.java          |  14 +-
 .../InputProcessorStepForPartitionImpl.java     | 251 ---------------
 .../InputProcessorStepWithNoConverterImpl.java  | 306 +++++++++++++++++++
 .../util/CarbonDataProcessorUtil.java           |  29 ++
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  60 ++--
 .../sdk/file/CarbonWriterBuilder.java           |   6 +
 11 files changed, 401 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 42bb958..e3c07fa 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -184,7 +184,8 @@ public class TableSchemaBuilder {
     if (field.getDataType().isComplexType()) {
       String parentFieldName = newColumn.getColumnName();
       if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
-        addColumn(new StructField("val",
+        String colName = getColNameForArray(parentFieldName);
+        addColumn(new StructField(colName,
             ((ArrayType) field.getDataType()).getElementType()), 
field.getFieldName(), false, true);
       } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")
           && ((StructType) field.getDataType()).getFields().size() > 0) {
@@ -198,6 +199,20 @@ public class TableSchemaBuilder {
     return newColumn;
   }
 
+  private String getColNameForArray(String parentFieldName) {
+    if (!parentFieldName.contains(".val")) {
+      return "val";
+    } else {
+      String[] splits = parentFieldName.split("val");
+      if (splits.length == 1) {
+        return "val" + 1;
+      } else {
+        return "val" + (Integer.parseInt(parentFieldName
+            .substring(parentFieldName.lastIndexOf("val") + 3, 
parentFieldName.length())) + 1);
+      }
+    }
+  }
+
   /**
    * Throw exception if {@param field} name is repeated
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index fabcd02..6b02d5a 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -678,8 +678,8 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
     val exception =
       intercept[RuntimeException] {
-      buildTestDataWithBadRecordFail()
-    }
+        buildTestDataWithBadRecordFail()
+      }
     assert(exception.getMessage()
       .contains("Data load failed due to bad record"))
 
@@ -780,7 +780,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
                      |      }
                      |      ]
                      |  }
-                     """.stripMargin
+                   """.stripMargin
 
     val json: String = """ {"name": "bob","age": 10,"address": ["abc", 
"defg"]} """
 
@@ -835,8 +835,8 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
 
     val json =
       """ {"name":"bob", "age":10,
-          |"address" : {"street":"abc", "city":"bang"},
-          |"doorNum" : [1,2,3,4]}""".stripMargin
+        |"address" : {"street":"abc", "city":"bang"},
+        |"doorNum" : [1,2,3,4]}""".stripMargin
 
     val fields = new Array[Field](4)
     fields(0) = new Field("name", DataTypes.STRING)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 1928b38..d6eab1d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -111,7 +111,7 @@ with Serializable {
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
-    model.setPartitionLoad(true)
+    model.setLoadWithoutCoverterStep(true)
 
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 028c404..2f904ed 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -44,8 +44,8 @@ import 
org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcess
 import 
org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
 import 
org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
 import 
org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
-import 
org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl;
 import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
+import 
org.apache.carbondata.processing.loading.steps.InputProcessorStepWithNoConverterImpl;
 import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -62,8 +62,8 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, 
storeLocation);
     SortScopeOptions.SortScope sortScope = 
CarbonDataProcessorUtil.getSortScope(configuration);
-    if (loadModel.isPartitionLoad()) {
-      return buildInternalForPartitionLoad(inputIterators, configuration, 
sortScope);
+    if (loadModel.isLoadWithoutCoverterStep()) {
+      return buildInternalWithNoConverter(inputIterators, configuration, 
sortScope);
     } else if (!configuration.isSortTable() ||
         sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
       return buildInternalForNoSort(inputIterators, configuration);
@@ -106,14 +106,14 @@ public final class DataLoadProcessBuilder {
   }
 
   /**
-   * Build pipe line for partition load
+   * Build pipe line for Load without Conversion Step.
    */
-  private AbstractDataLoadProcessorStep buildInternalForPartitionLoad(
+  private AbstractDataLoadProcessorStep buildInternalWithNoConverter(
       CarbonIterator[] inputIterators, CarbonDataLoadConfiguration 
configuration,
       SortScopeOptions.SortScope sortScope) {
     // Wraps with dummy processor.
     AbstractDataLoadProcessorStep inputProcessorStep =
-        new InputProcessorStepForPartitionImpl(configuration, inputIterators);
+        new InputProcessorStepWithNoConverterImpl(configuration, 
inputIterators);
     if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) {
       AbstractDataLoadProcessorStep sortProcessorStep =
           new SortProcessorStepImpl(configuration, inputProcessorStep);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 567a8b5..dd28dc6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -128,7 +128,7 @@ public class FieldEncoderFactory {
   /**
    * Create parser for the carbon column.
    */
-  private static GenericDataType createComplexDataType(DataField dataField,
+  public static GenericDataType createComplexDataType(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient 
client, Boolean useOnePass,
       Map<Object, Integer> localCache, int index, String nullFormat, Boolean 
isEmptyBadRecords) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 2a846e2..0cc0da3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -201,8 +201,10 @@ public class CarbonLoadModel implements Serializable {
 
   /**
    * It directly writes data directly to nosort processor bypassing all other 
processors.
+   * For this method there will be no data conversion step. It writes data 
which is directly
+   * pushed into.
    */
-  private boolean isPartitionLoad;
+  private boolean isLoadWithoutCoverterStep;
 
   /**
    * Flder path to where data should be written for this load.
@@ -435,7 +437,7 @@ public class CarbonLoadModel implements Serializable {
     copy.batchSortSizeInMb = batchSortSizeInMb;
     copy.isAggLoadRequest = isAggLoadRequest;
     copy.badRecordsLocation = badRecordsLocation;
-    copy.isPartitionLoad = isPartitionLoad;
+    copy.isLoadWithoutCoverterStep = isLoadWithoutCoverterStep;
     copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
     return copy;
   }
@@ -814,12 +816,12 @@ public class CarbonLoadModel implements Serializable {
   }
 
 
-  public boolean isPartitionLoad() {
-    return isPartitionLoad;
+  public boolean isLoadWithoutCoverterStep() {
+    return isLoadWithoutCoverterStep;
   }
 
-  public void setPartitionLoad(boolean partitionLoad) {
-    isPartitionLoad = partitionLoad;
+  public void setLoadWithoutCoverterStep(boolean loadWithoutCoverterStep) {
+    isLoadWithoutCoverterStep = loadWithoutCoverterStep;
   }
 
   public String getDataWritePath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
deleted file mode 100644
index 1dc9b27..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
-import 
org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It reads data from record reader and sends data to next step.
- */
-public class InputProcessorStepForPartitionImpl extends 
AbstractDataLoadProcessorStep {
-
-  private CarbonIterator<Object[]>[] inputIterators;
-
-  private boolean[] noDictionaryMapping;
-
-  private DataType[] dataTypes;
-
-  private int[] orderOfData;
-
-  public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration 
configuration,
-      CarbonIterator<Object[]>[] inputIterators) {
-    super(configuration, null);
-    this.inputIterators = inputIterators;
-  }
-
-  @Override public DataField[] getOutput() {
-    return configuration.getDataFields();
-  }
-
-  @Override public void initialize() throws IOException {
-    super.initialize();
-    // if logger is enabled then raw data will be required.
-    RowConverterImpl rowConverter =
-        new RowConverterImpl(configuration.getDataFields(), configuration, 
null);
-    rowConverter.initialize();
-    configuration.setCardinalityFinder(rowConverter);
-    noDictionaryMapping =
-        
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
-    dataTypes = new DataType[configuration.getDataFields().length];
-    for (int i = 0; i < dataTypes.length; i++) {
-      if 
(configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) 
{
-        dataTypes[i] = DataTypes.INT;
-      } else {
-        dataTypes[i] = 
configuration.getDataFields()[i].getColumn().getDataType();
-      }
-    }
-    orderOfData = arrangeData(configuration.getDataFields(), 
configuration.getHeader());
-  }
-
-  private int[] arrangeData(DataField[] dataFields, String[] header) {
-    int[] data = new int[dataFields.length];
-    for (int i = 0; i < dataFields.length; i++) {
-      for (int j = 0; j < header.length; j++) {
-        if 
(dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
-          data[i] = j;
-          break;
-        }
-      }
-    }
-    return data;
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] execute() {
-    int batchSize = CarbonProperties.getInstance().getBatchSize();
-    List<CarbonIterator<Object[]>>[] readerIterators = 
partitionInputReaderIterators();
-    Iterator<CarbonRowBatch>[] outIterators = new 
Iterator[readerIterators.length];
-    for (int i = 0; i < outIterators.length; i++) {
-      outIterators[i] =
-          new InputProcessorIterator(readerIterators[i], batchSize, 
configuration.isPreFetch(),
-              rowCounter, orderOfData, noDictionaryMapping, dataTypes);
-    }
-    return outIterators;
-  }
-
-  /**
-   * Partition input iterators equally as per the number of threads.
-   *
-   * @return
-   */
-  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
-    // Get the number of cores configured in property.
-    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    // Get the minimum of number of cores and iterators size to get the number 
of parallel threads
-    // to be launched.
-    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
-
-    List<CarbonIterator<Object[]>>[] iterators = new 
List[parallelThreadNumber];
-    for (int i = 0; i < parallelThreadNumber; i++) {
-      iterators[i] = new ArrayList<>();
-    }
-    // Equally partition the iterators as per number of threads
-    for (int i = 0; i < inputIterators.length; i++) {
-      iterators[i % parallelThreadNumber].add(inputIterators[i]);
-    }
-    return iterators;
-  }
-
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
-  @Override public void close() {
-    if (!closed) {
-      super.close();
-      for (CarbonIterator inputIterator : inputIterators) {
-        inputIterator.close();
-      }
-    }
-  }
-
-  @Override protected String getStepName() {
-    return "Input Processor";
-  }
-
-  /**
-   * This iterator wraps the list of iterators and it starts iterating the each
-   * iterator of the list one by one. It also parse the data while iterating 
it.
-   */
-  private static class InputProcessorIterator extends 
CarbonIterator<CarbonRowBatch> {
-
-    private List<CarbonIterator<Object[]>> inputIterators;
-
-    private CarbonIterator<Object[]> currentIterator;
-
-    private int counter;
-
-    private int batchSize;
-
-    private boolean nextBatch;
-
-    private boolean firstTime;
-
-    private AtomicLong rowCounter;
-
-    private boolean[] noDictionaryMapping;
-
-    private DataType[] dataTypes;
-
-    private int[] orderOfData;
-
-    public InputProcessorIterator(List<CarbonIterator<Object[]>> 
inputIterators, int batchSize,
-        boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] 
noDictionaryMapping,
-        DataType[] dataTypes) {
-      this.inputIterators = inputIterators;
-      this.batchSize = batchSize;
-      this.counter = 0;
-      // Get the first iterator from the list.
-      currentIterator = inputIterators.get(counter++);
-      this.rowCounter = rowCounter;
-      this.nextBatch = false;
-      this.firstTime = true;
-      this.noDictionaryMapping = noDictionaryMapping;
-      this.dataTypes = dataTypes;
-      this.orderOfData = orderOfData;
-    }
-
-    @Override public boolean hasNext() {
-      return nextBatch || internalHasNext();
-    }
-
-    private boolean internalHasNext() {
-      if (firstTime) {
-        firstTime = false;
-        currentIterator.initialize();
-      }
-      boolean hasNext = currentIterator.hasNext();
-      // If iterator is finished then check for next iterator.
-      if (!hasNext) {
-        currentIterator.close();
-        // Check next iterator is available in the list.
-        if (counter < inputIterators.size()) {
-          // Get the next iterator from the list.
-          currentIterator = inputIterators.get(counter++);
-          // Initialize the new iterator
-          currentIterator.initialize();
-          hasNext = internalHasNext();
-        }
-      }
-      return hasNext;
-    }
-
-    @Override public CarbonRowBatch next() {
-      return getBatch();
-    }
-
-    private CarbonRowBatch getBatch() {
-      // Create batch and fill it.
-      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
-      int count = 0;
-      while (internalHasNext() && count < batchSize) {
-        carbonRowBatch.addRow(new 
CarbonRow(convertToNoDictionaryToBytes(currentIterator.next())));
-        count++;
-      }
-      rowCounter.getAndAdd(carbonRowBatch.getSize());
-      return carbonRowBatch;
-    }
-
-    private Object[] convertToNoDictionaryToBytes(Object[] data) {
-      Object[] newData = new Object[data.length];
-      for (int i = 0; i < noDictionaryMapping.length; i++) {
-        if (noDictionaryMapping[i]) {
-          newData[i] = DataTypeUtil
-              .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], 
dataTypes[i]);
-        } else {
-          newData[i] = data[orderOfData[i]];
-        }
-      }
-      if (newData.length > noDictionaryMapping.length) {
-        for (int i = noDictionaryMapping.length; i < newData.length; i++) {
-          newData[i] = data[orderOfData[i]];
-        }
-      }
-      //      System.out.println(Arrays.toString(data));
-      return newData;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
new file mode 100644
index 0000000..8c24b7f
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
+import 
org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProcessorStep {
+
+  private CarbonIterator<Object[]>[] inputIterators;
+
+  private boolean[] noDictionaryMapping;
+
+  private DataType[] dataTypes;
+
+  private int[] orderOfData;
+
+  private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
+
+  public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration 
configuration,
+      CarbonIterator<Object[]>[] inputIterators) {
+    super(configuration, null);
+    this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+    return configuration.getDataFields();
+  }
+
+  @Override public void initialize() throws IOException {
+    super.initialize();
+    // if logger is enabled then raw data will be required.
+    RowConverterImpl rowConverter =
+        new RowConverterImpl(configuration.getDataFields(), configuration, 
null);
+    rowConverter.initialize();
+    configuration.setCardinalityFinder(rowConverter);
+    noDictionaryMapping =
+        
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+
+    dataFieldsWithComplexDataType = new HashMap<>();
+    convertComplexDataType(dataFieldsWithComplexDataType);
+
+    dataTypes = new DataType[configuration.getDataFields().length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      if 
(configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) 
{
+        dataTypes[i] = DataTypes.INT;
+      } else {
+        dataTypes[i] = 
configuration.getDataFields()[i].getColumn().getDataType();
+      }
+    }
+    orderOfData = arrangeData(configuration.getDataFields(), 
configuration.getHeader());
+  }
+
+  private void convertComplexDataType(Map<Integer, GenericDataType> 
dataFieldsWithComplexDataType) {
+    DataField[] srcDataField = configuration.getDataFields();
+    FieldEncoderFactory fieldConverterFactory = 
FieldEncoderFactory.getInstance();
+    String nullFormat =
+        
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    boolean isEmptyBadRecord = Boolean.parseBoolean(
+        
configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+            .toString());
+    for (int i = 0; i < srcDataField.length; i++) {
+      if (srcDataField[i].getColumn().isComplex()) {
+        // create a ComplexDataType
+        
dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(),
+            fieldConverterFactory
+                .createComplexDataType(srcDataField[i], null, 
configuration.getTableIdentifier(),
+                    null, false, null, i, nullFormat, isEmptyBadRecord));
+      }
+    }
+  }
+
+  private int[] arrangeData(DataField[] dataFields, String[] header) {
+    int[] data = new int[dataFields.length];
+    for (int i = 0; i < dataFields.length; i++) {
+      for (int j = 0; j < header.length; j++) {
+        if 
(dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
+          data[i] = j;
+          break;
+        }
+      }
+    }
+    return data;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() {
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    List<CarbonIterator<Object[]>>[] readerIterators = 
partitionInputReaderIterators();
+    Iterator<CarbonRowBatch>[] outIterators = new 
Iterator[readerIterators.length];
+    for (int i = 0; i < outIterators.length; i++) {
+      outIterators[i] =
+          new InputProcessorIterator(readerIterators[i], batchSize, 
configuration.isPreFetch(),
+              rowCounter, orderOfData, noDictionaryMapping, dataTypes,
+              configuration.getDataFields(), dataFieldsWithComplexDataType);
+    }
+    return outIterators;
+  }
+
+  /**
+   * Partition input iterators equally as per the number of threads.
+   *
+   * @return
+   */
+  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
+    // Get the number of cores configured in property.
+    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    // Get the minimum of number of cores and iterators size to get the number 
of parallel threads
+    // to be launched.
+    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+    List<CarbonIterator<Object[]>>[] iterators = new 
List[parallelThreadNumber];
+    for (int i = 0; i < parallelThreadNumber; i++) {
+      iterators[i] = new ArrayList<>();
+    }
+    // Equally partition the iterators as per number of threads
+    for (int i = 0; i < inputIterators.length; i++) {
+      iterators[i % parallelThreadNumber].add(inputIterators[i]);
+    }
+    return iterators;
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      for (CarbonIterator inputIterator : inputIterators) {
+        inputIterator.close();
+      }
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Input Processor";
+  }
+
+  /**
+   * This iterator wraps the list of iterators and it starts iterating the each
+   * iterator of the list one by one. It also parse the data while iterating 
it.
+   */
+  private static class InputProcessorIterator extends 
CarbonIterator<CarbonRowBatch> {
+
+    private List<CarbonIterator<Object[]>> inputIterators;
+
+    private CarbonIterator<Object[]> currentIterator;
+
+    private int counter;
+
+    private int batchSize;
+
+    private boolean nextBatch;
+
+    private boolean firstTime;
+
+    private AtomicLong rowCounter;
+
+    private boolean[] noDictionaryMapping;
+
+    private DataType[] dataTypes;
+
+    private DataField[] dataFields;
+
+    private int[] orderOfData;
+
+    private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
+
+    public InputProcessorIterator(List<CarbonIterator<Object[]>> 
inputIterators, int batchSize,
+        boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] 
noDictionaryMapping,
+        DataType[] dataTypes, DataField[] dataFields,
+        Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
+      this.inputIterators = inputIterators;
+      this.batchSize = batchSize;
+      this.counter = 0;
+      // Get the first iterator from the list.
+      currentIterator = inputIterators.get(counter++);
+      this.rowCounter = rowCounter;
+      this.nextBatch = false;
+      this.firstTime = true;
+      this.noDictionaryMapping = noDictionaryMapping;
+      this.dataTypes = dataTypes;
+      this.dataFields = dataFields;
+      this.orderOfData = orderOfData;
+      this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType;
+    }
+
+    @Override public boolean hasNext() {
+      return nextBatch || internalHasNext();
+    }
+
+    private boolean internalHasNext() {
+      if (firstTime) {
+        firstTime = false;
+        currentIterator.initialize();
+      }
+      boolean hasNext = currentIterator.hasNext();
+      // If iterator is finished then check for next iterator.
+      if (!hasNext) {
+        currentIterator.close();
+        // Check next iterator is available in the list.
+        if (counter < inputIterators.size()) {
+          // Get the next iterator from the list.
+          currentIterator = inputIterators.get(counter++);
+          // Initialize the new iterator
+          currentIterator.initialize();
+          hasNext = internalHasNext();
+        }
+      }
+      return hasNext;
+    }
+
+    @Override public CarbonRowBatch next() {
+      return getBatch();
+    }
+
+    private CarbonRowBatch getBatch() {
+      // Create batch and fill it.
+      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
+      int count = 0;
+      while (internalHasNext() && count < batchSize) {
+        carbonRowBatch.addRow(
+            new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), 
dataFields)));
+        count++;
+      }
+      rowCounter.getAndAdd(carbonRowBatch.getSize());
+      return carbonRowBatch;
+    }
+
+    private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] 
dataFields) {
+      Object[] newData = new Object[data.length];
+      for (int i = 0; i < data.length; i++) {
+        if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
+          newData[i] = DataTypeUtil
+              .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], 
dataTypes[i]);
+        } else {
+          // if this is a complex column then recursively comver the data into 
Byte Array.
+          if (dataTypes[i].isComplexType()) {
+            ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+            DataOutputStream dataOutputStream = new 
DataOutputStream(byteArray);
+            try {
+              GenericDataType complextType =
+                  
dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal());
+
+              complextType.writeByteArray(data[orderOfData[i]], 
dataOutputStream);
+
+              dataOutputStream.close();
+              newData[i] = byteArray.toByteArray();
+            } catch (Exception e) {
+              throw new CarbonDataLoadingException("Loading Exception", e);
+            }
+          } else {
+            newData[i] = data[orderOfData[i]];
+          }
+        }
+      }
+      // System.out.println(Arrays.toString(data));
+      return newData;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index f47853e..f921fd5 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -240,6 +240,35 @@ public final class CarbonDataProcessorUtil {
         .toPrimitive(noDictionaryMapping.toArray(new 
Boolean[noDictionaryMapping.size()]));
   }
 
+  public static void getComplexNoDictionaryMapping(DataField[] dataFields,
+      List<Integer> complexNoDictionary) {
+
+    // save the Ordinal Number in the List.
+    for (DataField field : dataFields) {
+      if (field.getColumn().isComplex()) {
+        // get the childs.
+        getComplexNoDictionaryMapping(
+            ((CarbonDimension) field.getColumn()).getListOfChildDimensions(), 
complexNoDictionary);
+      }
+    }
+  }
+
+  public static void getComplexNoDictionaryMapping(List<CarbonDimension> 
carbonDimensions,
+      List<Integer> complexNoDictionary) {
+    for (CarbonDimension carbonDimension : carbonDimensions) {
+      if (carbonDimension.isComplex()) {
+        
getComplexNoDictionaryMapping(carbonDimension.getListOfChildDimensions(),
+            complexNoDictionary);
+      } else {
+        // This is primitive type. Check the encoding for NoDictionary.
+        if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+          complexNoDictionary.add(carbonDimension.getOrdinal());
+        }
+      }
+    }
+  }
+
+
   /**
    * Preparing the boolean [] to map whether the dimension use inverted index 
or not.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index bc2e9db..946040f 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -26,6 +26,8 @@ import java.util.UUID;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
+import org.apache.carbondata.processing.loading.complexobjects.StructObject;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 import org.apache.avro.Schema;
@@ -70,15 +72,15 @@ class AvroCarbonWriter extends CarbonWriter {
       avroSchema = avroRecord.getSchema();
     }
     List<Schema.Field> fields = avroSchema.getFields();
-    Object [] csvField = new Object[fields.size()];
+    Object[] csvField = new Object[fields.size()];
     for (int i = 0; i < fields.size(); i++) {
-      csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i), 0);
+      csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i));
     }
     return csvField;
   }
 
-  private String avroFieldToObject(Schema.Field avroField, Object fieldValue, 
int delimiterLevel) {
-    StringBuilder out = new StringBuilder();
+  private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
+    Object out = new Object();
     Schema.Type type = avroField.schema().getType();
     switch (type) {
       case BOOLEAN:
@@ -86,55 +88,39 @@ class AvroCarbonWriter extends CarbonWriter {
       case LONG:
       case DOUBLE:
       case STRING:
+        out = fieldValue;
+        break;
       case FLOAT:
-        out.append(fieldValue.toString());
+        Float f = (Float) fieldValue;
+        out = f.doubleValue();
         break;
       case RECORD:
         List<Schema.Field> fields = avroField.schema().getFields();
-        String delimiter = null;
-        delimiterLevel ++;
+
+        Object[] structChildObjects = new Object[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
-          if (delimiterLevel == 1) {
-            delimiter = "$";
-          } else if (delimiterLevel > 1) {
-            delimiter = ":";
-          }
-          if (i != (fields.size() - 1)) {
-            out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) 
fieldValue).get(i),
-                delimiterLevel)).append(delimiter);
-          } else {
-            out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) 
fieldValue).get(i),
-                delimiterLevel));
-          }
+          structChildObjects[i] =
+              avroFieldToObject(fields.get(i), ((GenericData.Record) 
fieldValue).get(i));
         }
+        StructObject structObject = new StructObject(structChildObjects);
+        out = structObject;
         break;
       case ARRAY:
         int size = ((ArrayList) fieldValue).size();
-        String delimiterArray = null;
-        delimiterLevel ++;
-        if (delimiterLevel == 1) {
-          delimiterArray = "$";
-        } else if (delimiterLevel > 1) {
-          delimiterArray = ":";
-        }
-
+        Object[] arrayChildObjects = new Object[size];
         for (int i = 0; i < size; i++) {
-          if (i != size - 1) {
-            out.append(avroFieldToObject(
-                new Schema.Field(avroField.name(), 
avroField.schema().getElementType(), null, true),
-                ((ArrayList) fieldValue).get(i), 
delimiterLevel)).append(delimiterArray);
-          } else {
-            out.append(avroFieldToObject(
-                new Schema.Field(avroField.name(), 
avroField.schema().getElementType(), null, true),
-                ((ArrayList) fieldValue).get(i), delimiterLevel));
-          }
+          arrayChildObjects[i] = (avroFieldToObject(
+              new Schema.Field(avroField.name(), 
avroField.schema().getElementType(), null, true),
+              ((ArrayList) fieldValue).get(i)));
         }
+        ArrayObject arrayObject = new ArrayObject(arrayChildObjects);
+        out = arrayObject;
         break;
 
       default:
         throw new UnsupportedOperationException();
     }
-    return out.toString();
+    return out;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 397f151..76a46d0 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -330,6 +330,12 @@ public class CarbonWriterBuilder {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
     CarbonLoadModel loadModel = createLoadModel();
+
+    // AVRO records are pushed to Carbon as Object not as Strings. This was 
done in order to
+    // handle multi level complex type support. As there are no conversion 
converter step is
+    // removed from the load. LoadWithoutConverter flag is going to point to 
the Loader Builder
+    // which will skip Conversion Step.
+    loadModel.setLoadWithoutCoverterStep(true);
     return new AvroCarbonWriter(loadModel);
   }
 

Reply via email to