[CARBONDATA-2165]Remove spark in carbon-hadoop module 1. Streaming relation RecordReader is moved to carbon-streaming module. 2. RDD related class is moved to carbon-spark2 module
This closes #2074 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c723947a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c723947a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c723947a Branch: refs/heads/master Commit: c723947a79332c66175f5a33cf57f08fe70fe1a9 Parents: 2e1ddb5 Author: Jacky Li <[email protected]> Authored: Sat Mar 17 18:13:08 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Mar 28 11:19:23 2018 +0800 ---------------------------------------------------------------------- .../carbondata/core/util/DataTypeConverter.java | 3 + .../core/util/DataTypeConverterImpl.java | 5 + hadoop/CARBON_HADOOPLogResource.properties | 18 - hadoop/pom.xml | 7 - .../readsupport/impl/RawDataReadSupport.java | 42 - .../streaming/CarbonStreamInputFormat.java | 115 --- .../streaming/CarbonStreamOutputFormat.java | 87 --- .../streaming/CarbonStreamRecordReader.java | 759 ------------------ .../streaming/CarbonStreamRecordWriter.java | 325 -------- .../hadoop/streaming/StreamBlockletReader.java | 259 ------- .../hadoop/streaming/StreamBlockletWriter.java | 152 ---- .../hadoop/testutil/StoreCreator.java | 495 ++++++++++++ .../carbondata/hadoop/util/CarbonTypeUtil.java | 101 --- .../hadoop/ft/CarbonTableInputFormatTest.java | 6 +- .../hadoop/ft/CarbonTableOutputFormatTest.java | 2 +- .../streaming/CarbonStreamInputFormatTest.java | 99 --- .../streaming/CarbonStreamOutputFormatTest.java | 121 --- .../hadoop/test/util/StoreCreator.java | 492 ------------ integration/spark-common/pom.xml | 2 +- .../spark/util/SparkDataTypeConverterImpl.java | 81 ++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 3 +- .../carbondata/spark/rdd/StreamHandoffRDD.scala | 435 +++++++++++ .../carbondata/spark/util/CarbonScalaUtil.scala | 52 +- .../CarbonSparkStreamingListener.scala | 30 + .../streaming/CarbonStreamSparkStreaming.scala | 184 +++++ .../CarbonStreamingQueryListener.scala | 77 ++ .../streaming/StreamSinkFactory.scala | 236 ++++++ .../streaming/CarbonAppendableStreamSink.scala | 362 +++++++++ .../spark/sql/test/TestQueryExecutor.scala | 4 +- integration/spark2/pom.xml | 2 +- .../org/apache/spark/sql/CarbonSession.scala | 2 +- .../CarbonAlterTableCompactionCommand.scala | 3 +- streaming/pom.xml | 9 +- .../streaming/CarbonStreamInputFormat.java | 115 +++ .../streaming/CarbonStreamOutputFormat.java | 87 +++ .../streaming/CarbonStreamRecordReader.java | 761 +++++++++++++++++++ .../streaming/CarbonStreamRecordWriter.java | 325 ++++++++ .../streaming/StreamBlockletReader.java | 259 +++++++ .../streaming/StreamBlockletWriter.java | 152 ++++ .../streaming/segment/StreamSegment.java | 2 +- .../CarbonSparkStreamingListener.scala | 31 - .../streaming/CarbonStreamSparkStreaming.scala | 187 ----- .../carbondata/streaming/StreamHandoffRDD.scala | 436 ----------- .../streaming/StreamSinkFactory.scala | 236 ------ .../streaming/parser/FieldConverter.scala | 95 +++ .../streaming/parser/RowStreamParserImp.scala | 7 +- .../streaming/CarbonAppendableStreamSink.scala | 362 --------- .../CarbonStreamingQueryListener.scala | 77 -- .../streaming/CarbonStreamInputFormatTest.java | 99 +++ .../streaming/CarbonStreamOutputFormatTest.java | 121 +++ 50 files changed, 3948 insertions(+), 3974 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java index 7c63860..474493a 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.util; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; + public interface DataTypeConverter { Object convertFromStringToDecimal(Object data); @@ -31,4 +33,5 @@ public interface DataTypeConverter { Object wrapWithGenericArrayData(Object data); Object wrapWithGenericRow(Object[] fields); + Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java index ea5740d..a4f571e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.math.BigDecimal; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; public class DataTypeConverterImpl implements DataTypeConverter, Serializable { @@ -91,4 +92,8 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable { return fields; } + @Override + public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/CARBON_HADOOPLogResource.properties ---------------------------------------------------------------------- diff --git a/hadoop/CARBON_HADOOPLogResource.properties b/hadoop/CARBON_HADOOPLogResource.properties deleted file mode 100644 index 135a578..0000000 --- a/hadoop/CARBON_HADOOPLogResource.properties +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -carbon.hadoop = {0} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop/pom.xml b/hadoop/pom.xml index 916b9db..41e2822 100644 --- a/hadoop/pom.xml +++ b/hadoop/pom.xml @@ -40,10 +40,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> @@ -55,9 +51,6 @@ <resources> <resource> <directory>.</directory> - <includes> - <include>CARBON_HADOOPLogResource.properties</include> - </includes> </resource> </resources> <plugins> http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java deleted file mode 100644 index b2cd450..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.hadoop.readsupport.impl; - -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; - -public class RawDataReadSupport implements CarbonReadSupport<InternalRow> { - - @Override - public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) { } - - /** - * return column data as InternalRow - * - * @param data column data - */ - @Override - public InternalRow readRow(Object[] data) { - return new GenericInternalRow(data); - } - - @Override public void close() { } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java deleted file mode 100644 index a6e9563..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.IOException; - -import org.apache.carbondata.core.cache.Cache; -import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.scan.complextypes.ArrayQueryType; -import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType; -import org.apache.carbondata.core.scan.complextypes.StructQueryType; -import org.apache.carbondata.core.scan.filter.GenericQueryType; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * Stream input format - */ -public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> { - - public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size"; - public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; - - @Override public RecordReader<Void, Object> createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - return new CarbonStreamRecordReader(); - } - - public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable, - CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) - throws IOException { - GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length]; - for (int i = 0; i < carbonColumns.length; i++) { - if (carbonColumns[i].isComplex()) { - if (DataTypes.isArrayType(carbonColumns[i].getDataType())) { - queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(), - carbonColumns[i].getColName(), i); - } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) { - queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(), - carbonColumns[i].getColName(), i); - } else { - throw new UnsupportedOperationException( - carbonColumns[i].getDataType().getName() + " is not supported"); - } - - fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache); - } - } - - return queryTypes; - } - - private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType, - CarbonDimension dimension, int parentBlockIndex, - Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException { - for (int i = 0; i < dimension.getNumberOfChild(); i++) { - CarbonDimension child = dimension.getListOfChildDimensions().get(i); - DataType dataType = child.getDataType(); - GenericQueryType queryType = null; - if (DataTypes.isArrayType(dataType)) { - queryType = - new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); - - } else if (DataTypes.isStructType(dataType)) { - queryType = - new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); - parentQueryType.addChildren(queryType); - } else { - boolean isDirectDictionary = - CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY); - String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties() - .get(CarbonCommonConstants.DICTIONARY_PATH); - DictionaryColumnUniqueIdentifier dictionarIdentifier = - new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(), - child.getColumnIdentifier(), child.getDataType(), dictionaryPath); - - queryType = - new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex, - child.getDataType(), 4, cache.get(dictionarIdentifier), - isDirectDictionary); - } - parentQueryType.addChildren(queryType); - if (child.getNumberOfChild() > 0) { - fillChildren(carbontable, queryType, child, parentBlockIndex, cache); - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java deleted file mode 100644 index 2599fa7..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.IOException; -import java.nio.charset.Charset; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -/** - * Stream output format - */ -public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> { - - static final byte[] CARBON_SYNC_MARKER = - "@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); - - public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = "carbon.stream.row.buffer.size"; - - public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024; - - public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = "carbon.stream.blocklet.row.nums"; - - public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000; - - public static final String CARBON_STREAM_CACHE_SIZE = "carbon.stream.cache.size"; - - public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024; - - private static final String LOAD_Model = "mapreduce.output.carbon.load.model"; - - private static final String SEGMENT_ID = "carbon.segment.id"; - - @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job) - throws IOException, InterruptedException { - return new CarbonStreamRecordWriter(job); - } - - public static void setCarbonLoadModel(Configuration hadoopConf, CarbonLoadModel carbonLoadModel) - throws IOException { - if (carbonLoadModel != null) { - hadoopConf.set(LOAD_Model, ObjectSerializationUtil.convertObjectToString(carbonLoadModel)); - } - } - - public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) throws IOException { - String value = hadoopConf.get(LOAD_Model); - if (value == null) { - return null; - } else { - return (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(value); - } - } - - public static void setSegmentId(Configuration hadoopConf, String segmentId) throws IOException { - if (segmentId != null) { - hadoopConf.set(SEGMENT_ID, segmentId); - } - } - - public static String getSegmentId(Configuration hadoopConf) throws IOException { - return hadoopConf.get(SEGMENT_ID); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java deleted file mode 100644 index 1e227c4..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java +++ /dev/null @@ -1,759 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.cache.Cache; -import org.apache.carbondata.core.cache.CacheProvider; -import org.apache.carbondata.core.cache.CacheType; -import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; -import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.reader.CarbonHeaderReader; -import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; -import org.apache.carbondata.core.scan.filter.FilterUtil; -import org.apache.carbondata.core.scan.filter.GenericQueryType; -import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; -import org.apache.carbondata.core.scan.filter.intf.RowImpl; -import org.apache.carbondata.core.scan.filter.intf.RowIntf; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.format.BlockletHeader; -import org.apache.carbondata.format.FileHeader; -import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.InputMetricsStats; -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; -import org.apache.carbondata.hadoop.util.CarbonTypeUtil; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.spark.memory.MemoryMode; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnVector; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.unsafe.types.CalendarInterval; -import org.apache.spark.unsafe.types.UTF8String; - -/** - * Stream record reader - */ -public class CarbonStreamRecordReader extends RecordReader<Void, Object> { - // vector reader - private boolean isVectorReader; - - // metadata - private CarbonTable carbonTable; - private CarbonColumn[] storageColumns; - private boolean[] isRequired; - private DataType[] measureDataTypes; - private int dimensionCount; - private int measureCount; - - // input - private FileSplit fileSplit; - private Configuration hadoopConf; - private StreamBlockletReader input; - private boolean isFirstRow = true; - private QueryModel model; - - // decode data - private BitSet allNonNull; - private boolean[] isNoDictColumn; - private DirectDictionaryGenerator[] directDictionaryGenerators; - private CacheProvider cacheProvider; - private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; - private GenericQueryType[] queryTypes; - - // vectorized reader - private StructType outputSchema; - private ColumnarBatch columnarBatch; - private boolean isFinished = false; - - // filter - private FilterExecuter filter; - private boolean[] isFilterRequired; - private Object[] filterValues; - private RowIntf filterRow; - private int[] filterMap; - - // output - private CarbonColumn[] projection; - private boolean[] isProjectionRequired; - private int[] projectionMap; - private Object[] outputValues; - private InternalRow outputRow; - - // empty project, null filter - private boolean skipScanData; - - // return raw row for handoff - private boolean useRawRow = false; - - // InputMetricsStats - private InputMetricsStats inputMetricsStats; - - @Override public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - // input - if (split instanceof CarbonInputSplit) { - fileSplit = (CarbonInputSplit) split; - } else if (split instanceof CarbonMultiBlockSplit) { - fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); - } else { - fileSplit = (FileSplit) split; - } - - // metadata - hadoopConf = context.getConfiguration(); - if (model == null) { - CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); - model = format.createQueryModel(split, context); - } - carbonTable = model.getTable(); - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - dimensionCount = dimensions.size(); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - measureCount = measures.size(); - List<CarbonColumn> carbonColumnList = - carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName()); - storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); - isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); - directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; - for (int i = 0; i < storageColumns.length; i++) { - if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { - directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory - .getDirectDictionaryGenerator(storageColumns[i].getDataType()); - } - } - measureDataTypes = new DataType[measureCount]; - for (int i = 0; i < measureCount; i++) { - measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType(); - } - - // decode data - allNonNull = new BitSet(storageColumns.length); - projection = model.getProjectionColumns(); - - isRequired = new boolean[storageColumns.length]; - boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); - boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); - isFilterRequired = new boolean[storageColumns.length]; - filterMap = new int[storageColumns.length]; - for (int i = 0; i < storageColumns.length; i++) { - if (storageColumns[i].isDimension()) { - if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { - isRequired[i] = true; - isFilterRequired[i] = true; - filterMap[i] = storageColumns[i].getOrdinal(); - } - } else { - if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { - isRequired[i] = true; - isFilterRequired[i] = true; - filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); - } - } - } - - isProjectionRequired = new boolean[storageColumns.length]; - projectionMap = new int[storageColumns.length]; - for (int i = 0; i < storageColumns.length; i++) { - for (int j = 0; j < projection.length; j++) { - if (storageColumns[i].getColName().equals(projection[j].getColName())) { - isRequired[i] = true; - isProjectionRequired[i] = true; - projectionMap[i] = j; - break; - } - } - } - - // initialize filter - if (null != model.getFilterExpressionResolverTree()) { - initializeFilter(); - } else if (projection.length == 0) { - skipScanData = true; - } - - } - - private void initializeFilter() { - - List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil - .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), - carbonTable.getMeasureByTableName(carbonTable.getTableName())); - int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; - for (int i = 0; i < dimLensWithComplex.length; i++) { - dimLensWithComplex[i] = Integer.MAX_VALUE; - } - - int[] dictionaryColumnCardinality = - CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); - SegmentProperties segmentProperties = - new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); - Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); - - FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree(); - filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, - complexDimensionInfoMap); - // for row filter, we need update column index - FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), - carbonTable.getDimensionOrdinalMax()); - - } - - public void setQueryModel(QueryModel model) { - this.model = model; - } - - private byte[] getSyncMarker(String filePath) throws IOException { - CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); - FileHeader header = headerReader.readHeader(); - return header.getSync_marker(); - } - - public void setUseRawRow(boolean useRawRow) { - this.useRawRow = useRawRow; - } - - private void initializeAtFirstRow() throws IOException { - filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount]; - filterRow = new RowImpl(); - filterRow.setValues(filterValues); - - outputValues = new Object[projection.length]; - outputRow = new GenericInternalRow(outputValues); - - Path file = fileSplit.getPath(); - - byte[] syncMarker = getSyncMarker(file.toString()); - - FileSystem fs = file.getFileSystem(hadoopConf); - - int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE, - CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT)); - - FSDataInputStream fileIn = fs.open(file, bufferSize); - fileIn.seek(fileSplit.getStart()); - input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), - fileSplit.getStart() == 0); - - cacheProvider = CacheProvider.getInstance(); - cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY); - queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache); - - outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection)); - } - - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - if (isFirstRow) { - isFirstRow = false; - initializeAtFirstRow(); - } - if (isFinished) { - return false; - } - - if (isVectorReader) { - return nextColumnarBatch(); - } - - return nextRow(); - } - - /** - * for vector reader, check next columnar batch - */ - private boolean nextColumnarBatch() throws IOException { - boolean hasNext; - boolean scanMore = false; - do { - // move to the next blocklet - hasNext = input.nextBlocklet(); - if (hasNext) { - // read blocklet header - BlockletHeader header = input.readBlockletHeader(); - if (isScanRequired(header)) { - scanMore = !scanBlockletAndFillVector(header); - } else { - input.skipBlockletData(true); - scanMore = true; - } - } else { - isFinished = true; - scanMore = false; - } - } while (scanMore); - return hasNext; - } - - /** - * check next Row - */ - private boolean nextRow() throws IOException { - // read row one by one - try { - boolean hasNext; - boolean scanMore = false; - do { - hasNext = input.hasNext(); - if (hasNext) { - if (skipScanData) { - input.nextRow(); - scanMore = false; - } else { - if (useRawRow) { - // read raw row for streaming handoff which does not require decode raw row - readRawRowFromStream(); - } else { - readRowFromStream(); - } - if (null != filter) { - scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax()); - } else { - scanMore = false; - } - } - } else { - if (input.nextBlocklet()) { - BlockletHeader header = input.readBlockletHeader(); - if (isScanRequired(header)) { - if (skipScanData) { - input.skipBlockletData(false); - } else { - input.readBlockletData(header); - } - } else { - input.skipBlockletData(true); - } - scanMore = true; - } else { - isFinished = true; - scanMore = false; - } - } - } while (scanMore); - return hasNext; - } catch (FilterUnsupportedException e) { - throw new IOException("Failed to filter row in detail reader", e); - } - } - - @Override public Void getCurrentKey() throws IOException, InterruptedException { - return null; - } - - @Override public Object getCurrentValue() throws IOException, InterruptedException { - if (isVectorReader) { - int value = columnarBatch.numValidRows(); - if (inputMetricsStats != null) { - inputMetricsStats.incrementRecordRead((long) value); - } - - return columnarBatch; - } - - if (inputMetricsStats != null) { - inputMetricsStats.incrementRecordRead(1L); - } - - return outputRow; - } - - private boolean isScanRequired(BlockletHeader header) { - // TODO require to implement min-max index - if (null == filter) { - return true; - } - return true; - } - - private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException { - // if filter is null and output projection is empty, use the row number of blocklet header - if (skipScanData) { - int rowNums = header.getBlocklet_info().getNum_rows(); - columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums); - columnarBatch.setNumRows(rowNums); - input.skipBlockletData(true); - return rowNums > 0; - } - - input.readBlockletData(header); - columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums()); - int rowNum = 0; - if (null == filter) { - while (input.hasNext()) { - readRowFromStream(); - putRowToColumnBatch(rowNum++); - } - } else { - try { - while (input.hasNext()) { - readRowFromStream(); - if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) { - putRowToColumnBatch(rowNum++); - } - } - } catch (FilterUnsupportedException e) { - throw new IOException("Failed to filter row in vector reader", e); - } - } - columnarBatch.setNumRows(rowNum); - return rowNum > 0; - } - - private void readRowFromStream() { - input.nextRow(); - short nullLen = input.readShort(); - BitSet nullBitSet = allNonNull; - if (nullLen > 0) { - nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); - } - int colCount = 0; - // primitive type dimension - for (; colCount < isNoDictColumn.length; colCount++) { - if (nullBitSet.get(colCount)) { - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = null; - } - } else { - if (isNoDictColumn[colCount]) { - int v = input.readShort(); - if (isRequired[colCount]) { - byte[] b = input.readBytes(v); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = b; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = - DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b, - storageColumns[colCount].getDataType()); - } - } else { - input.skipBytes(v); - } - } else if (null != directDictionaryGenerators[colCount]) { - if (isRequired[colCount]) { - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = input.copy(4); - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = - directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt()); - } else { - input.skipBytes(4); - } - } else { - input.skipBytes(4); - } - } else { - if (isRequired[colCount]) { - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = input.copy(4); - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = input.readInt(); - } else { - input.skipBytes(4); - } - } else { - input.skipBytes(4); - } - } - } - } - // complex type dimension - for (; colCount < dimensionCount; colCount++) { - if (nullBitSet.get(colCount)) { - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = null; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = null; - } - } else { - short v = input.readShort(); - if (isRequired[colCount]) { - byte[] b = input.readBytes(v); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = b; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = queryTypes[colCount] - .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b)); - } - } else { - input.skipBytes(v); - } - } - } - // measure - DataType dataType; - for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { - if (nullBitSet.get(colCount)) { - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = null; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = null; - } - } else { - dataType = measureDataTypes[msrCount]; - if (dataType == DataTypes.BOOLEAN) { - if (isRequired[colCount]) { - boolean v = input.readBoolean(); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = v; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = v; - } - } else { - input.skipBytes(1); - } - } else if (dataType == DataTypes.SHORT) { - if (isRequired[colCount]) { - short v = input.readShort(); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = v; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = v; - } - } else { - input.skipBytes(2); - } - } else if (dataType == DataTypes.INT) { - if (isRequired[colCount]) { - int v = input.readInt(); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = v; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = v; - } - } else { - input.skipBytes(4); - } - } else if (dataType == DataTypes.LONG) { - if (isRequired[colCount]) { - long v = input.readLong(); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = v; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = v; - } - } else { - input.skipBytes(8); - } - } else if (dataType == DataTypes.DOUBLE) { - if (isRequired[colCount]) { - double v = input.readDouble(); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = v; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = v; - } - } else { - input.skipBytes(8); - } - } else if (DataTypes.isDecimal(dataType)) { - int len = input.readShort(); - if (isRequired[colCount]) { - BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); - if (isFilterRequired[colCount]) { - filterValues[filterMap[colCount]] = v; - } - if (isProjectionRequired[colCount]) { - outputValues[projectionMap[colCount]] = Decimal.apply(v); - } - } else { - input.skipBytes(len); - } - } - } - } - } - - private void readRawRowFromStream() { - input.nextRow(); - short nullLen = input.readShort(); - BitSet nullBitSet = allNonNull; - if (nullLen > 0) { - nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); - } - int colCount = 0; - // primitive type dimension - for (; colCount < isNoDictColumn.length; colCount++) { - if (nullBitSet.get(colCount)) { - outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; - } else { - if (isNoDictColumn[colCount]) { - int v = input.readShort(); - outputValues[colCount] = input.readBytes(v); - } else { - outputValues[colCount] = input.readInt(); - } - } - } - // complex type dimension - for (; colCount < dimensionCount; colCount++) { - if (nullBitSet.get(colCount)) { - outputValues[colCount] = null; - } else { - short v = input.readShort(); - outputValues[colCount] = input.readBytes(v); - } - } - // measure - DataType dataType; - for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { - if (nullBitSet.get(colCount)) { - outputValues[colCount] = null; - } else { - dataType = measureDataTypes[msrCount]; - if (dataType == DataTypes.BOOLEAN) { - outputValues[colCount] = input.readBoolean(); - } else if (dataType == DataTypes.SHORT) { - outputValues[colCount] = input.readShort(); - } else if (dataType == DataTypes.INT) { - outputValues[colCount] = input.readInt(); - } else if (dataType == DataTypes.LONG) { - outputValues[colCount] = input.readLong(); - } else if (dataType == DataTypes.DOUBLE) { - outputValues[colCount] = input.readDouble(); - } else if (DataTypes.isDecimal(dataType)) { - int len = input.readShort(); - outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); - } - } - } - } - - private void putRowToColumnBatch(int rowId) { - for (int i = 0; i < projection.length; i++) { - Object value = outputValues[i]; - ColumnVector col = columnarBatch.column(i); - org.apache.spark.sql.types.DataType t = col.dataType(); - if (null == value) { - col.putNull(rowId); - } else { - if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { - col.putBoolean(rowId, (boolean)value); - } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { - col.putByte(rowId, (byte) value); - } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { - col.putShort(rowId, (short) value); - } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { - col.putInt(rowId, (int) value); - } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { - col.putLong(rowId, (long) value); - } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { - col.putFloat(rowId, (float) value); - } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { - col.putDouble(rowId, (double) value); - } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { - UTF8String v = (UTF8String) value; - col.putByteArray(rowId, v.getBytes()); - } else if (t instanceof org.apache.spark.sql.types.DecimalType) { - DecimalType dt = (DecimalType)t; - Decimal d = Decimal.fromDecimal(value); - if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { - col.putInt(rowId, (int)d.toUnscaledLong()); - } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { - col.putLong(rowId, d.toUnscaledLong()); - } else { - final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); - byte[] bytes = integer.toByteArray(); - col.putByteArray(rowId, bytes, 0, bytes.length); - } - } else if (t instanceof CalendarIntervalType) { - CalendarInterval c = (CalendarInterval) value; - col.getChildColumn(0).putInt(rowId, c.months); - col.getChildColumn(1).putLong(rowId, c.microseconds); - } else if (t instanceof org.apache.spark.sql.types.DateType) { - col.putInt(rowId, (int) value); - } else if (t instanceof org.apache.spark.sql.types.TimestampType) { - col.putLong(rowId, (long) value); - } - } - } - } - - @Override public float getProgress() throws IOException, InterruptedException { - return 0; - } - - public void setVectorReader(boolean isVectorReader) { - this.isVectorReader = isVectorReader; - } - - public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { - this.inputMetricsStats = inputMetricsStats; - } - - @Override public void close() throws IOException { - if (null != input) { - input.close(); - } - if (null != columnarBatch) { - columnarBatch.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java deleted file mode 100644 index a4b3be8..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.util.CarbonMetadataUtil; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.format.FileHeader; -import org.apache.carbondata.processing.loading.BadRecordsLogger; -import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; -import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.loading.DataField; -import org.apache.carbondata.processing.loading.DataLoadProcessBuilder; -import org.apache.carbondata.processing.loading.converter.RowConverter; -import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.processing.loading.parser.RowParser; -import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl; -import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskID; - -/** - * Stream record writer - */ -public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName()); - - // basic info - private Configuration hadoopConf; - private CarbonLoadModel carbonLoadModel; - private CarbonDataLoadConfiguration configuration; - private CarbonTable carbonTable; - private int maxRowNums; - private int maxCacheSize; - - // parser and converter - private RowParser rowParser; - private BadRecordsLogger badRecordLogger; - private RowConverter converter; - private CarbonRow currentRow = new CarbonRow(null); - - // encoder - private DataField[] dataFields; - private BitSet nullBitSet; - private boolean[] isNoDictionaryDimensionColumn; - private int dimensionWithComplexCount; - private int measureCount; - private DataType[] measureDataTypes; - private StreamBlockletWriter output = null; - - // data write - private String segmentDir; - private String fileName; - private DataOutputStream outputStream; - private boolean isFirstRow = true; - private boolean hasException = false; - - CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException { - initialize(job); - } - - public CarbonStreamRecordWriter(TaskAttemptContext job, CarbonLoadModel carbonLoadModel) - throws IOException { - this.carbonLoadModel = carbonLoadModel; - initialize(job); - } - - private void initialize(TaskAttemptContext job) throws IOException { - // set basic information - hadoopConf = job.getConfiguration(); - if (carbonLoadModel == null) { - carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); - if (carbonLoadModel == null) { - throw new IOException( - "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model"); - } - } - String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf); - carbonLoadModel.setSegmentId(segmentId); - carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); - long taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId(); - carbonLoadModel.setTaskNo("" + taskNo); - configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel); - maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS, - CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1; - maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE, - CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT); - - segmentDir = CarbonTablePath.getSegmentPath( - carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId); - fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0"); - } - - private void initializeAtFirstRow() throws IOException, InterruptedException { - - // initialize metadata - isNoDictionaryDimensionColumn = - CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); - dimensionWithComplexCount = configuration.getDimensionCount(); - measureCount = configuration.getMeasureCount(); - dataFields = configuration.getDataFields(); - measureDataTypes = new DataType[measureCount]; - for (int i = 0; i < measureCount; i++) { - measureDataTypes[i] = - dataFields[dimensionWithComplexCount + i].getColumn().getDataType(); - } - - // initialize parser and converter - rowParser = new RowParserImpl(dataFields, configuration); - badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); - converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger); - configuration.setCardinalityFinder(converter); - converter.initialize(); - - // initialize encoder - nullBitSet = new BitSet(dataFields.length); - int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE, - CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT); - output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize); - - // initialize data writer - String filePath = segmentDir + File.separator + fileName; - FileFactory.FileType fileType = FileFactory.getFileType(filePath); - CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType); - if (carbonFile.exists()) { - // if the file is existed, use the append api - outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType); - } else { - // IF the file is not existed, use the create api - outputStream = FileFactory.getDataOutputStream(filePath, fileType); - writeFileHeader(); - } - - isFirstRow = false; - } - - @Override public void write(Void key, Object value) throws IOException, InterruptedException { - if (isFirstRow) { - initializeAtFirstRow(); - } - - // parse and convert row - currentRow.setData(rowParser.parseRow((Object[]) value)); - converter.convert(currentRow); - - // null bit set - nullBitSet.clear(); - for (int i = 0; i < dataFields.length; i++) { - if (null == currentRow.getObject(i)) { - nullBitSet.set(i); - } - } - output.nextRow(); - byte[] b = nullBitSet.toByteArray(); - output.writeShort(b.length); - if (b.length > 0) { - output.writeBytes(b); - } - int dimCount = 0; - Object columnValue; - - // primitive type dimension - for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { - columnValue = currentRow.getObject(dimCount); - if (null != columnValue) { - if (isNoDictionaryDimensionColumn[dimCount]) { - byte[] col = (byte[]) columnValue; - output.writeShort(col.length); - output.writeBytes(col); - } else { - output.writeInt((int) columnValue); - } - } - } - // complex type dimension - for (; dimCount < dimensionWithComplexCount; dimCount++) { - columnValue = currentRow.getObject(dimCount); - if (null != columnValue) { - byte[] col = (byte[]) columnValue; - output.writeShort(col.length); - output.writeBytes(col); - } - } - // measure - DataType dataType; - for (int msrCount = 0; msrCount < measureCount; msrCount++) { - columnValue = currentRow.getObject(dimCount + msrCount); - if (null != columnValue) { - dataType = measureDataTypes[msrCount]; - if (dataType == DataTypes.BOOLEAN) { - output.writeBoolean((boolean) columnValue); - } else if (dataType == DataTypes.SHORT) { - output.writeShort((short) columnValue); - } else if (dataType == DataTypes.INT) { - output.writeInt((int) columnValue); - } else if (dataType == DataTypes.LONG) { - output.writeLong((long) columnValue); - } else if (dataType == DataTypes.DOUBLE) { - output.writeDouble((double) columnValue); - } else if (DataTypes.isDecimal(dataType)) { - BigDecimal val = (BigDecimal) columnValue; - byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); - output.writeShort(bigDecimalInBytes.length); - output.writeBytes(bigDecimalInBytes); - } else { - String msg = - "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType() - .getName(); - LOGGER.error(msg); - throw new IOException(msg); - } - } - } - - if (output.isFull()) { - appendBlockletToDataFile(); - } - } - - private void writeFileHeader() throws IOException { - List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil - .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), - carbonTable.getMeasureByTableName(carbonTable.getTableName())); - int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; - for (int i = 0; i < dimLensWithComplex.length; i++) { - dimLensWithComplex[i] = Integer.MAX_VALUE; - } - int[] dictionaryColumnCardinality = - CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); - List<Integer> cardinality = new ArrayList<>(); - List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter - .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality, - wrapperColumnSchemaList); - FileHeader fileHeader = - CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis()); - fileHeader.setIs_footer_present(false); - fileHeader.setIs_splitable(true); - fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER); - outputStream.write(CarbonUtil.getByteArray(fileHeader)); - } - - /** - * write a blocklet to file - */ - private void appendBlockletToDataFile() throws IOException { - if (output.getRowIndex() == -1) { - return; - } - output.apppendBlocklet(outputStream); - outputStream.flush(); - // reset data - output.reset(); - } - - @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - try { - // append remain buffer data - if (!hasException && !isFirstRow) { - appendBlockletToDataFile(); - converter.finish(); - } - } finally { - // close resource - CarbonUtil.closeStreams(outputStream); - if (output != null) { - output.close(); - } - if (badRecordLogger != null) { - badRecordLogger.closeStreams(); - } - } - } - - public String getSegmentDir() { - return segmentDir; - } - - public String getFileName() { - return fileName; - } - - public void setHasException(boolean hasException) { - this.hasException = hasException; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java deleted file mode 100644 index 1989198..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.compression.CompressorFactory; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.format.BlockletHeader; - -/** - * stream blocklet reader - */ -public class StreamBlockletReader { - - private byte[] buffer; - private int offset; - private final byte[] syncMarker; - private final byte[] syncBuffer; - private final int syncLen; - private long pos = 0; - private final InputStream in; - private final long limitStart; - private final long limitEnd; - private boolean isAlreadySync = false; - private Compressor compressor = CompressorFactory.getInstance().getCompressor(); - private int rowNums = 0; - private int rowIndex = 0; - private boolean isHeaderPresent; - - StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) { - this.syncMarker = syncMarker; - syncLen = syncMarker.length; - syncBuffer = new byte[syncLen]; - this.in = in; - limitStart = limit; - limitEnd = limitStart + syncLen; - this.isHeaderPresent = isHeaderPresent; - } - - private void ensureCapacity(int capacity) { - if (buffer == null || capacity > buffer.length) { - buffer = new byte[capacity]; - } - } - - /** - * find the first position of sync_marker in input stream - */ - private boolean sync() throws IOException { - if (!readBytesFromStream(syncBuffer, 0, syncLen)) { - return false; - } - boolean skipHeader = false; - for (int i = 0; i < limitStart; i++) { - int j = 0; - for (; j < syncLen; j++) { - if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break; - } - if (syncLen == j) { - if (isHeaderPresent) { - if (skipHeader) { - return true; - } else { - skipHeader = true; - } - } else { - return true; - } - } - int value = in.read(); - if (-1 == value) { - return false; - } - syncBuffer[i % syncLen] = (byte) value; - pos++; - } - return false; - } - - BlockletHeader readBlockletHeader() throws IOException { - int len = readIntFromStream(); - byte[] b = new byte[len]; - if (!readBytesFromStream(b, 0, len)) { - throw new EOFException("Failed to read blocklet header"); - } - BlockletHeader header = CarbonUtil.readBlockletHeader(b); - rowNums = header.getBlocklet_info().getNum_rows(); - rowIndex = 0; - return header; - } - - void readBlockletData(BlockletHeader header) throws IOException { - ensureCapacity(header.getBlocklet_length()); - offset = 0; - int len = readIntFromStream(); - byte[] b = new byte[len]; - if (!readBytesFromStream(b, 0, len)) { - throw new EOFException("Failed to read blocklet data"); - } - compressor.rawUncompress(b, buffer); - } - - void skipBlockletData(boolean reset) throws IOException { - int len = readIntFromStream(); - skip(len); - pos += len; - if (reset) { - this.rowNums = 0; - this.rowIndex = 0; - } - } - - private void skip(int len) throws IOException { - long remaining = len; - do { - long skipLen = in.skip(remaining); - remaining -= skipLen; - } while (remaining > 0); - } - - /** - * find the next blocklet - */ - boolean nextBlocklet() throws IOException { - if (pos >= limitStart) { - return false; - } - if (isAlreadySync) { - if (!readBytesFromStream(syncBuffer, 0, syncLen)) { - return false; - } - } else { - isAlreadySync = true; - if (!sync()) { - return false; - } - } - - return pos < limitEnd; - } - - boolean hasNext() throws IOException { - return rowIndex < rowNums; - } - - void nextRow() { - rowIndex++; - } - - int readIntFromStream() throws IOException { - int ch1 = in.read(); - int ch2 = in.read(); - int ch3 = in.read(); - int ch4 = in.read(); - if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException(); - pos += 4; - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); - } - - /** - * Reads <code>len</code> bytes of data from the input stream into - * an array of bytes. - * @return <code>true</code> if reading data successfully, or - * <code>false</code> if there is no more data because the end of the stream has been reached. - */ - boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException { - int readLen = in.read(b, offset, len); - if (readLen < 0) { - return false; - } - pos += readLen; - if (readLen < len) { - return readBytesFromStream(b, offset + readLen, len - readLen); - } else { - return true; - } - } - - boolean readBoolean() { - return (buffer[offset++]) != 0; - } - - short readShort() { - short v = (short) ((buffer[offset + 1] & 255) + - ((buffer[offset]) << 8)); - offset += 2; - return v; - } - - byte[] copy(int len) { - byte[] b = new byte[len]; - System.arraycopy(buffer, offset, b, 0, len); - return b; - } - - int readInt() { - int v = ((buffer[offset + 3] & 255) + - ((buffer[offset + 2] & 255) << 8) + - ((buffer[offset + 1] & 255) << 16) + - ((buffer[offset]) << 24)); - offset += 4; - return v; - } - - long readLong() { - long v = ((long)(buffer[offset + 7] & 255)) + - ((long) (buffer[offset + 6] & 255) << 8) + - ((long) (buffer[offset + 5] & 255) << 16) + - ((long) (buffer[offset + 4] & 255) << 24) + - ((long) (buffer[offset + 3] & 255) << 32) + - ((long) (buffer[offset + 2] & 255) << 40) + - ((long) (buffer[offset + 1] & 255) << 48) + - ((long) (buffer[offset]) << 56); - offset += 8; - return v; - } - - double readDouble() { - return Double.longBitsToDouble(readLong()); - } - - byte[] readBytes(int len) { - byte[] b = new byte[len]; - System.arraycopy(buffer, offset, b, 0, len); - offset += len; - return b; - } - - void skipBytes(int len) { - offset += len; - } - - int getRowNums() { - return rowNums; - } - - void close() { - CarbonUtil.closeStreams(in); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java deleted file mode 100644 index a0328b3..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.compression.CompressorFactory; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.format.BlockletHeader; -import org.apache.carbondata.format.BlockletInfo; -import org.apache.carbondata.format.MutationType; - -/** - * stream blocklet writer - */ -public class StreamBlockletWriter { - private byte[] buffer; - private int maxSize; - private int maxRowNum; - private int rowSize; - private int count = 0; - private int rowIndex = -1; - private Compressor compressor = CompressorFactory.getInstance().getCompressor(); - - StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) { - buffer = new byte[maxSize]; - this.maxSize = maxSize; - this.maxRowNum = maxRowNum; - this.rowSize = rowSize; - } - - private void ensureCapacity(int space) { - int newcount = space + count; - if (newcount > buffer.length) { - byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)]; - System.arraycopy(buffer, 0, newbuf, 0, count); - buffer = newbuf; - } - } - - void reset() { - count = 0; - rowIndex = -1; - } - - byte[] getBytes() { - return buffer; - } - - int getCount() { - return count; - } - - int getRowIndex() { - return rowIndex; - } - - void nextRow() { - rowIndex++; - } - - boolean isFull() { - return rowIndex == maxRowNum || count >= maxSize; - } - - void writeBoolean(boolean val) { - ensureCapacity(1); - buffer[count] = (byte) (val ? 1 : 0); - count += 1; - } - - void writeShort(int val) { - ensureCapacity(2); - buffer[count + 1] = (byte) (val); - buffer[count] = (byte) (val >>> 8); - count += 2; - } - - void writeInt(int val) { - ensureCapacity(4); - buffer[count + 3] = (byte) (val); - buffer[count + 2] = (byte) (val >>> 8); - buffer[count + 1] = (byte) (val >>> 16); - buffer[count] = (byte) (val >>> 24); - count += 4; - } - - void writeLong(long val) { - ensureCapacity(8); - buffer[count + 7] = (byte) (val); - buffer[count + 6] = (byte) (val >>> 8); - buffer[count + 5] = (byte) (val >>> 16); - buffer[count + 4] = (byte) (val >>> 24); - buffer[count + 3] = (byte) (val >>> 32); - buffer[count + 2] = (byte) (val >>> 40); - buffer[count + 1] = (byte) (val >>> 48); - buffer[count] = (byte) (val >>> 56); - count += 8; - } - - void writeDouble(double val) { - writeLong(Double.doubleToLongBits(val)); - } - - void writeBytes(byte[] b) { - writeBytes(b, 0, b.length); - } - - void writeBytes(byte[] b, int off, int len) { - ensureCapacity(len); - System.arraycopy(b, off, buffer, count, len); - count += len; - } - - void apppendBlocklet(DataOutputStream outputStream) throws IOException { - outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER); - - BlockletInfo blockletInfo = new BlockletInfo(); - blockletInfo.setNum_rows(getRowIndex() + 1); - BlockletHeader blockletHeader = new BlockletHeader(); - blockletHeader.setBlocklet_length(getCount()); - blockletHeader.setMutation(MutationType.INSERT); - blockletHeader.setBlocklet_info(blockletInfo); - byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader); - outputStream.writeInt(headerBytes.length); - outputStream.write(headerBytes); - - byte[] compressed = compressor.compressByte(getBytes(), getCount()); - outputStream.writeInt(compressed.length); - outputStream.write(compressed); - } - - void close() { - } -}
