Repository: carbondata Updated Branches: refs/heads/master 51db049c4 -> 74ea24d14
[CARBONDATA-2443][SDK]Multi level complex type support for AVRO based SDK Multi level complex type support for AVRO based SDK This closes #2276 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ec33c112 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ec33c112 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ec33c112 Branch: refs/heads/master Commit: ec33c11286ebe8009ac07698bf23ffb3bd3e7711 Parents: 51db049 Author: sounakr <soun...@gmail.com> Authored: Mon May 7 06:51:54 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue May 8 16:49:08 2018 +0530 ---------------------------------------------------------------------- .../schema/table/TableSchemaBuilder.java | 17 +- .../TestNonTransactionalCarbonTable.scala | 10 +- .../datasources/SparkCarbonTableFormat.scala | 2 +- .../loading/DataLoadProcessBuilder.java | 12 +- .../converter/impl/FieldEncoderFactory.java | 2 +- .../loading/model/CarbonLoadModel.java | 14 +- .../InputProcessorStepForPartitionImpl.java | 251 --------------- .../InputProcessorStepWithNoConverterImpl.java | 306 +++++++++++++++++++ .../util/CarbonDataProcessorUtil.java | 29 ++ .../carbondata/sdk/file/AvroCarbonWriter.java | 60 ++-- .../sdk/file/CarbonWriterBuilder.java | 6 + 11 files changed, 401 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 42bb958..e3c07fa 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -184,7 +184,8 @@ public class TableSchemaBuilder { if (field.getDataType().isComplexType()) { String parentFieldName = newColumn.getColumnName(); if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { - addColumn(new StructField("val", + String colName = getColNameForArray(parentFieldName); + addColumn(new StructField(colName, ((ArrayType) field.getDataType()).getElementType()), field.getFieldName(), false, true); } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT") && ((StructType) field.getDataType()).getFields().size() > 0) { @@ -198,6 +199,20 @@ public class TableSchemaBuilder { return newColumn; } + private String getColNameForArray(String parentFieldName) { + if (!parentFieldName.contains(".val")) { + return "val"; + } else { + String[] splits = parentFieldName.split("val"); + if (splits.length == 1) { + return "val" + 1; + } else { + return "val" + (Integer.parseInt(parentFieldName + .substring(parentFieldName.lastIndexOf("val") + 3, parentFieldName.length())) + 1); + } + } + } + /** * Throw exception if {@param field} name is repeated */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index fabcd02..6b02d5a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -678,8 +678,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val exception = intercept[RuntimeException] { - buildTestDataWithBadRecordFail() - } + buildTestDataWithBadRecordFail() + } assert(exception.getMessage() .contains("Data load failed due to bad record")) @@ -780,7 +780,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | } | ] | } - """.stripMargin + """.stripMargin val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """ @@ -835,8 +835,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val json = """ {"name":"bob", "age":10, - |"address" : {"street":"abc", "city":"bang"}, - |"doorNum" : [1,2,3,4]}""".stripMargin + |"address" : {"street":"abc", "city":"bang"}, + |"doorNum" : [1,2,3,4]}""".stripMargin val fields = new Array[Field](4) fields(0) = new Field("name", DataTypes.STRING) http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index 1928b38..d6eab1d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -111,7 +111,7 @@ with Serializable { model.setDictionaryServerHost(options.getOrElse("dicthost", null)) model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) - model.setPartitionLoad(true) + model.setLoadWithoutCoverterStep(true) val staticPartition = options.getOrElse("staticpartition", null) if (staticPartition != null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 028c404..2f904ed 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -44,8 +44,8 @@ import org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcess import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl; import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl; import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl; -import org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl; import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl; +import org.apache.carbondata.processing.loading.steps.InputProcessorStepWithNoConverterImpl; import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -62,8 +62,8 @@ public final class DataLoadProcessBuilder { CarbonIterator[] inputIterators) throws Exception { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); - if (loadModel.isPartitionLoad()) { - return buildInternalForPartitionLoad(inputIterators, configuration, sortScope); + if (loadModel.isLoadWithoutCoverterStep()) { + return buildInternalWithNoConverter(inputIterators, configuration, sortScope); } else if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) { return buildInternalForNoSort(inputIterators, configuration); @@ -106,14 +106,14 @@ public final class DataLoadProcessBuilder { } /** - * Build pipe line for partition load + * Build pipe line for Load without Conversion Step. */ - private AbstractDataLoadProcessorStep buildInternalForPartitionLoad( + private AbstractDataLoadProcessorStep buildInternalWithNoConverter( CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration, SortScopeOptions.SortScope sortScope) { // Wraps with dummy processor. AbstractDataLoadProcessorStep inputProcessorStep = - new InputProcessorStepForPartitionImpl(configuration, inputIterators); + new InputProcessorStepWithNoConverterImpl(configuration, inputIterators); if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) { AbstractDataLoadProcessorStep sortProcessorStep = new SortProcessorStepImpl(configuration, inputProcessorStep); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java index 567a8b5..dd28dc6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java @@ -128,7 +128,7 @@ public class FieldEncoderFactory { /** * Create parser for the carbon column. */ - private static GenericDataType createComplexDataType(DataField dataField, + public static GenericDataType createComplexDataType(DataField dataField, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 2a846e2..0cc0da3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -201,8 +201,10 @@ public class CarbonLoadModel implements Serializable { /** * It directly writes data directly to nosort processor bypassing all other processors. + * For this method there will be no data conversion step. It writes data which is directly + * pushed into. */ - private boolean isPartitionLoad; + private boolean isLoadWithoutCoverterStep; /** * Flder path to where data should be written for this load. @@ -435,7 +437,7 @@ public class CarbonLoadModel implements Serializable { copy.batchSortSizeInMb = batchSortSizeInMb; copy.isAggLoadRequest = isAggLoadRequest; copy.badRecordsLocation = badRecordsLocation; - copy.isPartitionLoad = isPartitionLoad; + copy.isLoadWithoutCoverterStep = isLoadWithoutCoverterStep; copy.sortColumnsBoundsStr = sortColumnsBoundsStr; return copy; } @@ -814,12 +816,12 @@ public class CarbonLoadModel implements Serializable { } - public boolean isPartitionLoad() { - return isPartitionLoad; + public boolean isLoadWithoutCoverterStep() { + return isLoadWithoutCoverterStep; } - public void setPartitionLoad(boolean partitionLoad) { - isPartitionLoad = partitionLoad; + public void setLoadWithoutCoverterStep(boolean loadWithoutCoverterStep) { + isLoadWithoutCoverterStep = loadWithoutCoverterStep; } public String getDataWritePath() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java deleted file mode 100644 index 1dc9b27..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.processing.loading.steps; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; -import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.loading.DataField; -import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; -import org.apache.carbondata.processing.loading.row.CarbonRowBatch; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -/** - * It reads data from record reader and sends data to next step. - */ -public class InputProcessorStepForPartitionImpl extends AbstractDataLoadProcessorStep { - - private CarbonIterator<Object[]>[] inputIterators; - - private boolean[] noDictionaryMapping; - - private DataType[] dataTypes; - - private int[] orderOfData; - - public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration configuration, - CarbonIterator<Object[]>[] inputIterators) { - super(configuration, null); - this.inputIterators = inputIterators; - } - - @Override public DataField[] getOutput() { - return configuration.getDataFields(); - } - - @Override public void initialize() throws IOException { - super.initialize(); - // if logger is enabled then raw data will be required. - RowConverterImpl rowConverter = - new RowConverterImpl(configuration.getDataFields(), configuration, null); - rowConverter.initialize(); - configuration.setCardinalityFinder(rowConverter); - noDictionaryMapping = - CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); - dataTypes = new DataType[configuration.getDataFields().length]; - for (int i = 0; i < dataTypes.length; i++) { - if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) { - dataTypes[i] = DataTypes.INT; - } else { - dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType(); - } - } - orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader()); - } - - private int[] arrangeData(DataField[] dataFields, String[] header) { - int[] data = new int[dataFields.length]; - for (int i = 0; i < dataFields.length; i++) { - for (int j = 0; j < header.length; j++) { - if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) { - data[i] = j; - break; - } - } - } - return data; - } - - @Override public Iterator<CarbonRowBatch>[] execute() { - int batchSize = CarbonProperties.getInstance().getBatchSize(); - List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); - Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; - for (int i = 0; i < outIterators.length; i++) { - outIterators[i] = - new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(), - rowCounter, orderOfData, noDictionaryMapping, dataTypes); - } - return outIterators; - } - - /** - * Partition input iterators equally as per the number of threads. - * - * @return - */ - private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() { - // Get the number of cores configured in property. - int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); - // Get the minimum of number of cores and iterators size to get the number of parallel threads - // to be launched. - int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores); - - List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber]; - for (int i = 0; i < parallelThreadNumber; i++) { - iterators[i] = new ArrayList<>(); - } - // Equally partition the iterators as per number of threads - for (int i = 0; i < inputIterators.length; i++) { - iterators[i % parallelThreadNumber].add(inputIterators[i]); - } - return iterators; - } - - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } - - @Override public void close() { - if (!closed) { - super.close(); - for (CarbonIterator inputIterator : inputIterators) { - inputIterator.close(); - } - } - } - - @Override protected String getStepName() { - return "Input Processor"; - } - - /** - * This iterator wraps the list of iterators and it starts iterating the each - * iterator of the list one by one. It also parse the data while iterating it. - */ - private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> { - - private List<CarbonIterator<Object[]>> inputIterators; - - private CarbonIterator<Object[]> currentIterator; - - private int counter; - - private int batchSize; - - private boolean nextBatch; - - private boolean firstTime; - - private AtomicLong rowCounter; - - private boolean[] noDictionaryMapping; - - private DataType[] dataTypes; - - private int[] orderOfData; - - public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize, - boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping, - DataType[] dataTypes) { - this.inputIterators = inputIterators; - this.batchSize = batchSize; - this.counter = 0; - // Get the first iterator from the list. - currentIterator = inputIterators.get(counter++); - this.rowCounter = rowCounter; - this.nextBatch = false; - this.firstTime = true; - this.noDictionaryMapping = noDictionaryMapping; - this.dataTypes = dataTypes; - this.orderOfData = orderOfData; - } - - @Override public boolean hasNext() { - return nextBatch || internalHasNext(); - } - - private boolean internalHasNext() { - if (firstTime) { - firstTime = false; - currentIterator.initialize(); - } - boolean hasNext = currentIterator.hasNext(); - // If iterator is finished then check for next iterator. - if (!hasNext) { - currentIterator.close(); - // Check next iterator is available in the list. - if (counter < inputIterators.size()) { - // Get the next iterator from the list. - currentIterator = inputIterators.get(counter++); - // Initialize the new iterator - currentIterator.initialize(); - hasNext = internalHasNext(); - } - } - return hasNext; - } - - @Override public CarbonRowBatch next() { - return getBatch(); - } - - private CarbonRowBatch getBatch() { - // Create batch and fill it. - CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize); - int count = 0; - while (internalHasNext() && count < batchSize) { - carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next()))); - count++; - } - rowCounter.getAndAdd(carbonRowBatch.getSize()); - return carbonRowBatch; - } - - private Object[] convertToNoDictionaryToBytes(Object[] data) { - Object[] newData = new Object[data.length]; - for (int i = 0; i < noDictionaryMapping.length; i++) { - if (noDictionaryMapping[i]) { - newData[i] = DataTypeUtil - .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); - } else { - newData[i] = data[orderOfData[i]]; - } - } - if (newData.length > noDictionaryMapping.length) { - for (int i = noDictionaryMapping.length; i < newData.length; i++) { - newData[i] = data[orderOfData[i]]; - } - } - // System.out.println(Arrays.toString(data)); - return newData; - } - - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java new file mode 100644 index 0000000..8c24b7f --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.steps; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.datatypes.GenericDataType; +import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory; +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.loading.row.CarbonRowBatch; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * It reads data from record reader and sends data to next step. + */ +public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProcessorStep { + + private CarbonIterator<Object[]>[] inputIterators; + + private boolean[] noDictionaryMapping; + + private DataType[] dataTypes; + + private int[] orderOfData; + + private Map<Integer, GenericDataType> dataFieldsWithComplexDataType; + + public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration configuration, + CarbonIterator<Object[]>[] inputIterators) { + super(configuration, null); + this.inputIterators = inputIterators; + } + + @Override public DataField[] getOutput() { + return configuration.getDataFields(); + } + + @Override public void initialize() throws IOException { + super.initialize(); + // if logger is enabled then raw data will be required. + RowConverterImpl rowConverter = + new RowConverterImpl(configuration.getDataFields(), configuration, null); + rowConverter.initialize(); + configuration.setCardinalityFinder(rowConverter); + noDictionaryMapping = + CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); + + dataFieldsWithComplexDataType = new HashMap<>(); + convertComplexDataType(dataFieldsWithComplexDataType); + + dataTypes = new DataType[configuration.getDataFields().length]; + for (int i = 0; i < dataTypes.length; i++) { + if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) { + dataTypes[i] = DataTypes.INT; + } else { + dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType(); + } + } + orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader()); + } + + private void convertComplexDataType(Map<Integer, GenericDataType> dataFieldsWithComplexDataType) { + DataField[] srcDataField = configuration.getDataFields(); + FieldEncoderFactory fieldConverterFactory = FieldEncoderFactory.getInstance(); + String nullFormat = + configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT) + .toString(); + boolean isEmptyBadRecord = Boolean.parseBoolean( + configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD) + .toString()); + for (int i = 0; i < srcDataField.length; i++) { + if (srcDataField[i].getColumn().isComplex()) { + // create a ComplexDataType + dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(), + fieldConverterFactory + .createComplexDataType(srcDataField[i], null, configuration.getTableIdentifier(), + null, false, null, i, nullFormat, isEmptyBadRecord)); + } + } + } + + private int[] arrangeData(DataField[] dataFields, String[] header) { + int[] data = new int[dataFields.length]; + for (int i = 0; i < dataFields.length; i++) { + for (int j = 0; j < header.length; j++) { + if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) { + data[i] = j; + break; + } + } + } + return data; + } + + @Override public Iterator<CarbonRowBatch>[] execute() { + int batchSize = CarbonProperties.getInstance().getBatchSize(); + List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); + Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; + for (int i = 0; i < outIterators.length; i++) { + outIterators[i] = + new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(), + rowCounter, orderOfData, noDictionaryMapping, dataTypes, + configuration.getDataFields(), dataFieldsWithComplexDataType); + } + return outIterators; + } + + /** + * Partition input iterators equally as per the number of threads. + * + * @return + */ + private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() { + // Get the number of cores configured in property. + int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + // Get the minimum of number of cores and iterators size to get the number of parallel threads + // to be launched. + int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores); + + List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber]; + for (int i = 0; i < parallelThreadNumber; i++) { + iterators[i] = new ArrayList<>(); + } + // Equally partition the iterators as per number of threads + for (int i = 0; i < inputIterators.length; i++) { + iterators[i % parallelThreadNumber].add(inputIterators[i]); + } + return iterators; + } + + @Override protected CarbonRow processRow(CarbonRow row) { + return null; + } + + @Override public void close() { + if (!closed) { + super.close(); + for (CarbonIterator inputIterator : inputIterators) { + inputIterator.close(); + } + } + } + + @Override protected String getStepName() { + return "Input Processor"; + } + + /** + * This iterator wraps the list of iterators and it starts iterating the each + * iterator of the list one by one. It also parse the data while iterating it. + */ + private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> { + + private List<CarbonIterator<Object[]>> inputIterators; + + private CarbonIterator<Object[]> currentIterator; + + private int counter; + + private int batchSize; + + private boolean nextBatch; + + private boolean firstTime; + + private AtomicLong rowCounter; + + private boolean[] noDictionaryMapping; + + private DataType[] dataTypes; + + private DataField[] dataFields; + + private int[] orderOfData; + + private Map<Integer, GenericDataType> dataFieldsWithComplexDataType; + + public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize, + boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping, + DataType[] dataTypes, DataField[] dataFields, + Map<Integer, GenericDataType> dataFieldsWithComplexDataType) { + this.inputIterators = inputIterators; + this.batchSize = batchSize; + this.counter = 0; + // Get the first iterator from the list. + currentIterator = inputIterators.get(counter++); + this.rowCounter = rowCounter; + this.nextBatch = false; + this.firstTime = true; + this.noDictionaryMapping = noDictionaryMapping; + this.dataTypes = dataTypes; + this.dataFields = dataFields; + this.orderOfData = orderOfData; + this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType; + } + + @Override public boolean hasNext() { + return nextBatch || internalHasNext(); + } + + private boolean internalHasNext() { + if (firstTime) { + firstTime = false; + currentIterator.initialize(); + } + boolean hasNext = currentIterator.hasNext(); + // If iterator is finished then check for next iterator. + if (!hasNext) { + currentIterator.close(); + // Check next iterator is available in the list. + if (counter < inputIterators.size()) { + // Get the next iterator from the list. + currentIterator = inputIterators.get(counter++); + // Initialize the new iterator + currentIterator.initialize(); + hasNext = internalHasNext(); + } + } + return hasNext; + } + + @Override public CarbonRowBatch next() { + return getBatch(); + } + + private CarbonRowBatch getBatch() { + // Create batch and fill it. + CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize); + int count = 0; + while (internalHasNext() && count < batchSize) { + carbonRowBatch.addRow( + new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields))); + count++; + } + rowCounter.getAndAdd(carbonRowBatch.getSize()); + return carbonRowBatch; + } + + private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) { + Object[] newData = new Object[data.length]; + for (int i = 0; i < data.length; i++) { + if (i < noDictionaryMapping.length && noDictionaryMapping[i]) { + newData[i] = DataTypeUtil + .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); + } else { + // if this is a complex column then recursively comver the data into Byte Array. + if (dataTypes[i].isComplexType()) { + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArray); + try { + GenericDataType complextType = + dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal()); + + complextType.writeByteArray(data[orderOfData[i]], dataOutputStream); + + dataOutputStream.close(); + newData[i] = byteArray.toByteArray(); + } catch (Exception e) { + throw new CarbonDataLoadingException("Loading Exception", e); + } + } else { + newData[i] = data[orderOfData[i]]; + } + } + } + // System.out.println(Arrays.toString(data)); + return newData; + } + + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index f47853e..f921fd5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -240,6 +240,35 @@ public final class CarbonDataProcessorUtil { .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()])); } + public static void getComplexNoDictionaryMapping(DataField[] dataFields, + List<Integer> complexNoDictionary) { + + // save the Ordinal Number in the List. + for (DataField field : dataFields) { + if (field.getColumn().isComplex()) { + // get the childs. + getComplexNoDictionaryMapping( + ((CarbonDimension) field.getColumn()).getListOfChildDimensions(), complexNoDictionary); + } + } + } + + public static void getComplexNoDictionaryMapping(List<CarbonDimension> carbonDimensions, + List<Integer> complexNoDictionary) { + for (CarbonDimension carbonDimension : carbonDimensions) { + if (carbonDimension.isComplex()) { + getComplexNoDictionaryMapping(carbonDimension.getListOfChildDimensions(), + complexNoDictionary); + } else { + // This is primitive type. Check the encoding for NoDictionary. + if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) { + complexNoDictionary.add(carbonDimension.getOrdinal()); + } + } + } + } + + /** * Preparing the boolean [] to map whether the dimension use inverted index or not. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index bc2e9db..946040f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -26,6 +26,8 @@ import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; +import org.apache.carbondata.processing.loading.complexobjects.StructObject; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.avro.Schema; @@ -70,15 +72,15 @@ class AvroCarbonWriter extends CarbonWriter { avroSchema = avroRecord.getSchema(); } List<Schema.Field> fields = avroSchema.getFields(); - Object [] csvField = new Object[fields.size()]; + Object[] csvField = new Object[fields.size()]; for (int i = 0; i < fields.size(); i++) { - csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i), 0); + csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i)); } return csvField; } - private String avroFieldToObject(Schema.Field avroField, Object fieldValue, int delimiterLevel) { - StringBuilder out = new StringBuilder(); + private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { + Object out = new Object(); Schema.Type type = avroField.schema().getType(); switch (type) { case BOOLEAN: @@ -86,55 +88,39 @@ class AvroCarbonWriter extends CarbonWriter { case LONG: case DOUBLE: case STRING: + out = fieldValue; + break; case FLOAT: - out.append(fieldValue.toString()); + Float f = (Float) fieldValue; + out = f.doubleValue(); break; case RECORD: List<Schema.Field> fields = avroField.schema().getFields(); - String delimiter = null; - delimiterLevel ++; + + Object[] structChildObjects = new Object[fields.size()]; for (int i = 0; i < fields.size(); i++) { - if (delimiterLevel == 1) { - delimiter = "$"; - } else if (delimiterLevel > 1) { - delimiter = ":"; - } - if (i != (fields.size() - 1)) { - out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i), - delimiterLevel)).append(delimiter); - } else { - out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i), - delimiterLevel)); - } + structChildObjects[i] = + avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i)); } + StructObject structObject = new StructObject(structChildObjects); + out = structObject; break; case ARRAY: int size = ((ArrayList) fieldValue).size(); - String delimiterArray = null; - delimiterLevel ++; - if (delimiterLevel == 1) { - delimiterArray = "$"; - } else if (delimiterLevel > 1) { - delimiterArray = ":"; - } - + Object[] arrayChildObjects = new Object[size]; for (int i = 0; i < size; i++) { - if (i != size - 1) { - out.append(avroFieldToObject( - new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), - ((ArrayList) fieldValue).get(i), delimiterLevel)).append(delimiterArray); - } else { - out.append(avroFieldToObject( - new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), - ((ArrayList) fieldValue).get(i), delimiterLevel)); - } + arrayChildObjects[i] = (avroFieldToObject( + new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), + ((ArrayList) fieldValue).get(i))); } + ArrayObject arrayObject = new ArrayObject(arrayChildObjects); + out = arrayObject; break; default: throw new UnsupportedOperationException(); } - return out.toString(); + return out; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/ec33c112/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 397f151..76a46d0 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -330,6 +330,12 @@ public class CarbonWriterBuilder { Objects.requireNonNull(schema, "schema should not be null"); Objects.requireNonNull(path, "path should not be null"); CarbonLoadModel loadModel = createLoadModel(); + + // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to + // handle multi level complex type support. As there are no conversion converter step is + // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder + // which will skip Conversion Step. + loadModel.setLoadWithoutCoverterStep(true); return new AvroCarbonWriter(loadModel); }