Repository: carbondata Updated Branches: refs/heads/master 7edef8f4a -> 3202cf517
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java index 72a3ce4..677047b 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java @@ -17,10 +17,13 @@ package org.apache.carbondata.sdk.file; +import java.util.List; + import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; /** * A field represent one column @@ -31,6 +34,14 @@ public class Field { private String name; private DataType type; + private List<StructField> children; + private String parent; + private String storeType = "columnnar"; + private int schemaOrdinal = -1; + private int precision = 0; + private int scale = 0; + private String rawSchema = ""; + private String columnComment = ""; /** * Field Constructor @@ -59,16 +70,55 @@ public class Field { this.type = DataTypes.FLOAT; } else if (type.equalsIgnoreCase("double")) { this.type = DataTypes.DOUBLE; - } else { + } else if (type.equalsIgnoreCase("array")) { + this.type = DataTypes.createDefaultArrayType(); + } else if (type.equalsIgnoreCase("struct")) { + this.type = DataTypes.createDefaultStructType(); + } + else { throw new IllegalArgumentException("unsupported data type: " + type); } } - /** - * Field constructor - * @param name name of the field - * @param type datatype of the field of class DataType - */ + public Field(String name, String type, List<StructField> fields) { + this.name = name; + this.children = fields; + if (type.equalsIgnoreCase("string")) { + this.type = DataTypes.STRING; + } else if (type.equalsIgnoreCase("date")) { + this.type = DataTypes.DATE; + } else if (type.equalsIgnoreCase("timestamp")) { + this.type = DataTypes.TIMESTAMP; + } else if (type.equalsIgnoreCase("boolean")) { + this.type = DataTypes.BOOLEAN; + } else if (type.equalsIgnoreCase("byte")) { + this.type = DataTypes.BYTE; + } else if (type.equalsIgnoreCase("short")) { + this.type = DataTypes.SHORT; + } else if (type.equalsIgnoreCase("int")) { + this.type = DataTypes.INT; + } else if (type.equalsIgnoreCase("long")) { + this.type = DataTypes.LONG; + } else if (type.equalsIgnoreCase("float")) { + this.type = DataTypes.FLOAT; + } else if (type.equalsIgnoreCase("double")) { + this.type = DataTypes.DOUBLE; + } else if (type.equalsIgnoreCase("array")) { + this.type = DataTypes.createStructType(fields); + } else if (type.equalsIgnoreCase("struct")) { + this.type = DataTypes.createStructType(fields); + } + else { + throw new IllegalArgumentException("unsupported data type: " + type); + } + } + + public Field(String name, DataType type, List<StructField> fields) { + this.name = name; + this.type = type; + this.children = fields; + } + public Field(String name, DataType type) { this.name = name; this.type = type; @@ -81,4 +131,64 @@ public class Field { public DataType getDataType() { return type; } + + public List<StructField> getChildren() { + return children; + } + + public void setChildren(List<StructField> children) { + this.children = children; + } + + public String getParent() { + return parent; + } + + public void setParent(String parent) { + this.parent = parent; + } + + public String getStoreType() { + return storeType; + } + + public int getSchemaOrdinal() { + return schemaOrdinal; + } + + public void setSchemaOrdinal(int schemaOrdinal) { + this.schemaOrdinal = schemaOrdinal; + } + + public int getPrecision() { + return precision; + } + + public void setPrecision(int precision) { + this.precision = precision; + } + + public int getScale() { + return scale; + } + + public void setScale(int scale) { + this.scale = scale; + } + + public String getRawSchema() { + return rawSchema; + } + + public void setRawSchema(String rawSchema) { + this.rawSchema = rawSchema; + } + + public String getColumnComment() { + return columnComment; + } + + public void setColumnComment(String columnComment) { + this.columnComment = columnComment; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java index f85f7d5..ed3f2f1 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -20,20 +20,28 @@ package org.apache.carbondata.sdk.file; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.CharEncoding; +import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; import org.apache.avro.Schema; +import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType.file; + public class AvroCarbonWriterTest { private String path = "./AvroCarbonWriterSuiteWriteFiles"; @@ -177,7 +185,104 @@ public class AvroCarbonWriterTest { @Test public void testWriteNestedRecord() throws IOException { - // TODO + FileUtils.deleteDirectory(new File(path)); + + String newAvroSchema = + "{" + + " \"type\" : \"record\", " + + " \"name\" : \"userInfo\", " + + " \"namespace\" : \"my.example\", " + + " \"fields\" : [{\"name\" : \"username\", " + + " \"type\" : \"string\", " + + " \"default\" : \"NONE\"}, " + + + " {\"name\" : \"age\", " + + " \"type\" : \"int\", " + + " \"default\" : -1}, " + + + "{\"name\" : \"address\", " + + " \"type\" : { " + + " \"type\" : \"record\", " + + " \"name\" : \"mailing_address\", " + + " \"fields\" : [ {" + + " \"name\" : \"street\", " + + " \"type\" : \"string\", " + + " \"default\" : \"NONE\"}, { " + + + " \"name\" : \"city\", " + + " \"type\" : \"string\", " + + " \"default\" : \"NONE\"}, " + + " ]}, " + + " \"default\" : {} " + + " } " + +"}"; + + String mySchema = + "{" + + " \"name\": \"address\", " + + " \"type\": \"record\", " + + " \"fields\": [ " + + " { \"name\": \"name\", \"type\": \"string\"}, " + + " { \"name\": \"age\", \"type\": \"int\"}, " + + " { " + + " \"name\": \"address\", " + + " \"type\": { " + + " \"type\" : \"record\", " + + " \"name\" : \"my_address\", " + + " \"fields\" : [ " + + " {\"name\": \"street\", \"type\": \"string\"}, " + + " {\"name\": \"city\", \"type\": \"string\"} " + + " ]} " + + " } " + + "] " + + "}"; + + String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}"; + + + // conversion to GenericData.Record + Schema nn = new Schema.Parser().parse(mySchema); + JsonAvroConverter converter = new JsonAvroConverter(); + GenericData.Record record = converter.convertToGenericDataRecord( + json.getBytes(CharEncoding.UTF_8), nn); + + Field[] fields = new Field[3]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("name1", DataTypes.STRING); + // fields[1] = new Field("age", DataTypes.INT); + List fld = new ArrayList<StructField>(); + fld.add(new StructField("street", DataTypes.STRING)); + fld.add(new StructField("city", DataTypes.STRING)); + fields[2] = new Field("address", "struct", fld); + + try { + CarbonWriter writer = CarbonWriter.builder() + .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .outputPath(path) + .isTransactionalTable(true) + .buildWriterForAvroInput(); + + for (int i = 0; i < 100; i++) { + writer.write(record); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(1, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java index 66d89c8..266fabd 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java @@ -95,6 +95,9 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> { } else { boolean isDirectDictionary = CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY); + boolean isDictionary = + CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY); + String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties() .get(CarbonCommonConstants.DICTIONARY_PATH); DictionaryColumnUniqueIdentifier dictionarIdentifier = http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java index d5f77f4..cbf93b8 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java @@ -535,7 +535,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { } if (isProjectionRequired[colCount]) { outputValues[projectionMap[colCount]] = queryTypes[colCount] - .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b)); + .getDataBasedOnDataType(ByteBuffer.wrap(b)); } } else { input.skipBytes(v);
