This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 9929f3a [CARBONDATA-3446] Support read schema of complex data type from carbon file folder path 9929f3a is described below commit 9929f3a30e6f544b8696e780fb448179abcdc6b0 Author: xubo245 <xub...@huawei.com> AuthorDate: Thu Jun 20 12:08:11 2019 +0800 [CARBONDATA-3446] Support read schema of complex data type from carbon file folder path Backgroud: SDK can't read schema of complex data type from carbon file folder path Support: Support read schema of complex data type from carbon file folder path This closes #3301 --- .../core/metadata/datatype/ArrayType.java | 12 ++ .../core/metadata/datatype/DataTypes.java | 7 + .../carbondata/core/metadata/datatype/Field.java | 3 +- .../metadata/schema/table/TableSchemaBuilder.java | 13 +- .../carbondata/sdk/file/CarbonSchemaReader.java | 100 +++++++++-- .../carbondata/sdk/file/CarbonWriterBuilder.java | 5 +- .../apache/carbondata/sdk/file/utils/SDKUtil.java | 4 + .../carbondata/sdk/file/CSVCarbonWriterTest.java | 194 +++++++++++++++++++++ 8 files changed, 322 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java index 9dea241..9ff0a42 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java @@ -21,11 +21,19 @@ public class ArrayType extends DataType { private DataType elementType; + private String elementName; + ArrayType(DataType elementType) { super(DataTypes.ARRAY_TYPE_ID, 9, "ARRAY", -1); this.elementType = elementType; } + public ArrayType(DataType elementType, String elementName) { + super(DataTypes.ARRAY_TYPE_ID, 9, "ARRAY", -1); + this.elementType = elementType; + this.elementName = elementName; + } + @Override public boolean isComplexType() { return true; @@ -65,4 +73,8 @@ public class ArrayType extends DataType { return elementType; } + public String getElementName() { + return elementName; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java index e79d0dc..bfce7ae 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java @@ -142,6 +142,13 @@ public class DataTypes { } /** + * create array type with specified element type and name + */ + public static ArrayType createArrayType(DataType elementType, String elementName) { + return new ArrayType(elementType, elementName); + } + + /** * create a array type object with no child */ public static ArrayType createDefaultArrayType() { diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/Field.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/Field.java index 6b85a04..7eea083 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/Field.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/Field.java @@ -133,7 +133,8 @@ public class Field { } else if (type.equalsIgnoreCase("binary")) { this.type = DataTypes.BINARY; } else if (type.equalsIgnoreCase("array")) { - this.type = DataTypes.createArrayType(fields.get(0).getDataType()); + this.type = DataTypes.createArrayType(fields.get(0).getDataType(), + fields.get(0).getFieldName()); } else if (type.equalsIgnoreCase("struct")) { this.type = DataTypes.createStructType(fields); } else { diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 14fc67b..85c7674 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -27,6 +27,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.ArrayType; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.DecimalType; @@ -240,14 +241,20 @@ public class TableSchemaBuilder { String parentFieldName = newColumn.getColumnName(); if (DataTypes.isArrayType(field.getDataType())) { for (StructField structField : field.getChildren()) { - structField.setFieldName(getColNameForArray(valIndex)); + String colName = getColNameForArray(valIndex); + if (null != ((ArrayType) field.getDataType()).getElementName()) { + colName = ((ArrayType) field.getDataType()).getElementName(); + } + structField.setFieldName(colName); addColumn(structField, parentFieldName, valIndex, false, true, isInvertedIdxColumn); } } else if (DataTypes.isStructType(field.getDataType()) && ((StructType) field.getDataType()).getFields().size() > 0) { // This field has children. - for (StructField structField : field.getChildren()) { - addColumn(structField, parentFieldName, valIndex, false, true, isInvertedIdxColumn); + if (field.getChildren() != null) { + for (StructField structField : field.getChildren()) { + addColumn(structField, parentFieldName, valIndex, false, true, isInvertedIdxColumn); + } } } else if (DataTypes.isMapType(field.getDataType())) { for (StructField structField : field.getChildren()) { diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java index c692f6e..d9c5486 100644 --- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java +++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; @@ -30,7 +31,12 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.Field; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.reader.CarbonFooterReaderV3; import org.apache.carbondata.core.reader.CarbonHeaderReader; @@ -44,7 +50,6 @@ import org.apache.carbondata.sdk.file.arrow.ArrowConverter; import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema; import static org.apache.carbondata.core.util.path.CarbonTablePath.CARBON_DATA_EXT; import static org.apache.carbondata.core.util.path.CarbonTablePath.INDEX_FILE_EXT; -import static org.apache.carbondata.core.util.path.CarbonTablePath.MERGE_INDEX_FILE_EXT; import org.apache.hadoop.conf.Configuration; @@ -194,15 +199,7 @@ public class CarbonSchemaReader { throw new CarbonDataLoadingException("No carbonindex file in this path."); } } else { - String indexFilePath; - indexFilePath = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile file) { - return file.getName().endsWith(INDEX_FILE_EXT) || file.getName() - .endsWith(MERGE_INDEX_FILE_EXT); - } - })[0].getAbsolutePath(); - return readSchemaFromIndexFile(indexFilePath, conf); + return readSchemaFromFolder(path, conf); } } @@ -283,6 +280,89 @@ public class CarbonSchemaReader { return readSchema(indexFilePath, false); } + public static List<StructField> getChildrenCommon(CarbonTable table, String columnName) { + List<CarbonDimension> list = table.getChildren(columnName); + List<StructField> structFields = new ArrayList<StructField>(); + for (int i = 0; i < list.size(); i++) { + CarbonDimension carbonDimension = list.get(i); + if (DataTypes.isStructType(carbonDimension.getDataType())) { + structFields.add(getStructChildren(table, carbonDimension.getColName())); + return structFields; + } else if (DataTypes.isArrayType(carbonDimension.getDataType())) { + structFields.add(getArrayChildren(table, carbonDimension.getColName())); + return structFields; + } else if (DataTypes.isMapType(carbonDimension.getDataType())) { + //TODO + } else { + ColumnSchema columnSchema = carbonDimension.getColumnSchema(); + structFields.add(new StructField(columnSchema.getColumnName(), columnSchema.getDataType())); + } + } + return structFields; + } + + public static StructField getStructChildren(CarbonTable table, String columnName) { + List<StructField> structFields = getChildrenCommon(table, columnName); + return new StructField(columnName, DataTypes.createStructType(structFields)); + } + + public static StructField getArrayChildren(CarbonTable table, String columnName) { + List<StructField> structFields = getChildrenCommon(table, columnName); + return structFields.get(0); + } + + /** + * Read schema from carbon file folder path + * + * @param folderPath carbon file folder path + * @param conf hadoop configuration support, can set s3a AK,SK, + * end point and other conf with this + * @return carbon data Schema + * @throws IOException + */ + private static Schema readSchemaFromFolder(String folderPath, Configuration conf) + throws IOException { + String tableName = "UnknownTable" + UUID.randomUUID(); + CarbonTable table = CarbonTable.buildTable(folderPath, tableName, conf); + List<ColumnSchema> columnSchemaList = table.getTableInfo().getFactTable().getListOfColumns(); + int numOfChildren = 0; + for (ColumnSchema columnSchema : columnSchemaList) { + if (!(columnSchema.getColumnName().contains(CarbonCommonConstants.POINT))) { + numOfChildren++; + } + } + Field[] fields = new Field[numOfChildren]; + + int indexOfFields = 0; + for (ColumnSchema columnSchema : columnSchemaList) { + if (!columnSchema.getColumnName().contains(CarbonCommonConstants.POINT)) { + if (DataTypes.isStructType(columnSchema.getDataType())) { + StructField structField = getStructChildren(table, columnSchema.getColumnName()); + List<StructField> list = new ArrayList<>(); + list.add(structField); + fields[indexOfFields] = new Field(columnSchema.getColumnName(), + DataTypes.createStructType(list)); + fields[indexOfFields].setSchemaOrdinal(columnSchema.getSchemaOrdinal()); + indexOfFields++; + } else if (DataTypes.isArrayType(columnSchema.getDataType())) { + StructField structField = getArrayChildren(table, columnSchema.getColumnName()); + List<StructField> list = new ArrayList<>(); + list.add(structField); + fields[indexOfFields] = new Field(columnSchema.getColumnName(), "array", list); + fields[indexOfFields].setSchemaOrdinal(columnSchema.getSchemaOrdinal()); + indexOfFields++; + } else if (DataTypes.isMapType(columnSchema.getDataType())) { + //TODO + } else { + fields[indexOfFields] = new Field(columnSchema); + fields[indexOfFields].setSchemaOrdinal(columnSchema.getSchemaOrdinal()); + indexOfFields++; + } + } + } + return new Schema(fields); + } + /** * Read schema from carbonindex file * diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index c3abc2e..31fcdd1 100644 --- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -854,8 +854,9 @@ public class CarbonWriterBuilder { if (field.getChildren() != null && field.getChildren().size() > 0) { if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { // Loop through the inner columns and for a StructData - DataType complexType = - DataTypes.createArrayType(field.getChildren().get(0).getDataType()); + DataType complexType = DataTypes + .createArrayType(field.getChildren().get(0).getDataType(), + field.getChildren().get(0).getFieldName()); tableSchemaBuilder .addColumn(new StructField(field.getFieldName(), complexType, field.getChildren()), valIndex, false, isInvertedIdxColumn > -1); diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java index 00a42f0..7f9fe88 100644 --- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java +++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java @@ -75,4 +75,8 @@ public class SDKUtil { return getSplitList(path, suf, numOfSplit, new Configuration()); } + public static Object[] readObjects(Object[] input, int i) { + return (Object[]) input[i]; + } + } diff --git a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 98b958c..b90d5fe 100644 --- a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -32,10 +32,12 @@ import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.datatype.ArrayType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.DecimalType; import org.apache.carbondata.core.metadata.datatype.Field; import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; @@ -50,6 +52,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.apache.carbondata.sdk.file.utils.SDKUtil.readObjects; + /** * Test suite for {@link CSVCarbonWriter} */ @@ -676,4 +680,194 @@ public class CSVCarbonWriterTest { } } + @Test + public void testWritingAndReadingArrayString() throws IOException { + String path = "./testWriteFilesArrayString"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[4]; + fields[0] = new Field("id", DataTypes.STRING); + fields[1] = new Field("source", DataTypes.STRING); + fields[2] = new Field("usage", DataTypes.STRING); + + StructField[] stringFields = new StructField[1]; + stringFields[0] = new StructField("stringField", DataTypes.STRING); + + Field arrayType = new Field("annotations", "array", Arrays.asList(stringFields)); + fields[3] = arrayType; + try { + CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build(); + for (int i = 0; i < 15; i++) { + String[] row = new String[]{ + "robot" + (i % 10), + String.valueOf(i), + i + "." + i, + "sunflowers" + (i % 10) + "\002" + "modelarts/image_classification" + "\002" + "2019-03-30 17:22:31" + "\002" + "{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}" + + "\001" + + "roses" + (i % 10) + "\002" + "modelarts/image_classification" + "\002" + "2019-03-30 17:22:32" + "\002" + "{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}"}; + writer.write(row); + } + writer.close(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + + Schema schema = CarbonSchemaReader + .readSchema(path) + .asOriginOrder(); + + assert (4 == schema.getFieldsLength()); + Field[] fields1 = schema.getFields(); + boolean flag = false; + for (int i = 0; i < fields1.length; i++) { + if (DataTypes.isArrayType(fields1[i].getDataType())) { + ArrayType arrayType1 = (ArrayType) fields1[i].getDataType(); + assert ("annotations.stringField" .equalsIgnoreCase(arrayType1.getElementName())); + assert (DataTypes.STRING.equals(fields1[i].getChildren().get(0).getDataType())); + flag = true; + } + } + assert (flag); + + // Read again + CarbonReader reader = null; + try { + reader = CarbonReader + .builder(path) + .projection(new String[]{"id", "source", "usage", "annotations"}) + .build(); + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + assert (4 == row.length); + assert (((String) row[0]).contains("robot")); + int value = Integer.valueOf((String) row[1]); + Float value2 = Float.valueOf((String) row[2]); + assert (value > -1 || value < 15); + assert (value2 > -1 || value2 < 15); + Object[] annotations = (Object[]) row[3]; + for (int j = 0; j < annotations.length; j++) { + assert (((String) annotations[j]).contains("\u0002modelarts/image_classification\u00022019-03-30 17:22:31\u0002{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}") + || ((String) annotations[j]).contains("\u0002modelarts/image_classification\u00022019-03-30 17:22:32\u0002{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}")); + } + i++; + } + assert (15 == i); + reader.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + FileUtils.deleteDirectory(new File(path)); + } + } + + @Test + public void testWritingAndReadingArrayStruct() throws IOException { + String path = "./testWriteFilesArrayStruct"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[4]; + fields[0] = new Field("id", DataTypes.STRING); + fields[1] = new Field("source", DataTypes.STRING); + fields[2] = new Field("usage", DataTypes.STRING); + + List<StructField> structFieldsList = new ArrayList<>(); + structFieldsList.add(new StructField("name", DataTypes.STRING)); + structFieldsList.add(new StructField("type", DataTypes.STRING)); + structFieldsList.add(new StructField("creation-time", DataTypes.STRING)); + structFieldsList.add(new StructField("property", DataTypes.STRING)); + StructField structTypeByList = + new StructField("annotation", DataTypes.createStructType(structFieldsList), structFieldsList); + + List<StructField> list = new ArrayList<>(); + list.add(structTypeByList); + + Field arrayType = new Field("annotations", "array", list); + fields[3] = arrayType; + try { + CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path); + CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build(); + for (int i = 0; i < 15; i++) { + String[] row = new String[]{ + "robot" + (i % 10), + String.valueOf(i), + i + "." + i, + "sunflowers" + (i % 10) + "\002" + "modelarts/image_classification" + "\002" + "2019-03-30 17:22:31" + "\002" + "{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}" + + "\001" + + "roses" + (i % 10) + "\002" + "modelarts/image_classification" + "\002" + "2019-03-30 17:22:32" + "\002" + "{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}"}; + writer.write(row); + } + writer.close(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + + Schema schema = CarbonSchemaReader + .readSchema(path) + .asOriginOrder(); + + assert (4 == schema.getFieldsLength()); + Field[] fields1 = schema.getFields(); + boolean flag = false; + for (int i = 0; i < fields1.length; i++) { + if (DataTypes.isArrayType(fields1[i].getDataType())) { + ArrayType arrayType1 = (ArrayType) fields1[i].getDataType(); + assert ("annotations.annotation" .equalsIgnoreCase(arrayType1.getElementName())); + assert (DataTypes.isStructType(fields1[i].getChildren().get(0).getDataType())); + assert (4 == (((StructType) fields1[i].getChildren().get(0).getDataType()).getFields()).size()); + flag = true; + } + } + assert (flag); + + // Read again + CarbonReader reader = null; + try { + reader = CarbonReader + .builder(path) + .projection(new String[]{"id", "source", "usage", "annotations"}) + .build(); + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + assert (4 == row.length); + assert (((String) row[0]).contains("robot")); + int value = Integer.valueOf((String) row[1]); + Float value2 = Float.valueOf((String) row[2]); + assert (value > -1 || value < 15); + assert (value2 > -1 || value2 < 15); + Object[] annotations = (Object[]) row[3]; + for (int j = 0; j < annotations.length; j++) { + Object[] annotation = (Object[]) annotations[j]; + assert (((String) annotation[0]).contains("sunflowers") + || ((String) annotation[0]).contains("roses")); + + assert (((String) annotation[1]).contains("modelarts/image_classification")); + assert (((String) annotation[2]).contains("2019-03-30 17:22:3")); + assert (((String) annotation[3]).contains("{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}")); + + Object[] annotation1 = readObjects(annotations, j); + assert (((String) annotation1[0]).contains("sunflowers") + || ((String) annotation1[0]).contains("roses")); + + assert (((String) annotation1[1]).contains("modelarts/image_classification")); + assert (((String) annotation1[2]).contains("2019-03-30 17:22:3")); + assert (((String) annotation1[3]).contains("{\"@modelarts:start_index\":0,\"@modelarts:end_index\":5}")); + } + i++; + } + assert (15 == i); + reader.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + FileUtils.deleteDirectory(new File(path)); + } + } + }