Repository: carbondata Updated Branches: refs/heads/master b04269b2b -> edfcdca0a
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala index 8b0eca8..b9185aa 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala @@ -51,10 +51,7 @@ object TestUtil { .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "40") def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]):Unit = { - checkAnswer(df, expectedAnswer.asScala) match { - case Some(errorMessage) => assert(false, errorMessage) - case None => - } + checkAnswer(df, expectedAnswer.asScala) } def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) { @@ -69,10 +66,7 @@ object TestUtil { } def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = { - checkAnswer(df, expectedAnswer.collect()) match { - case Some(errorMessage) => assert(false, errorMessage) - case None => - } + checkAnswer(df, expectedAnswer.collect()) } /** @@ -83,7 +77,7 @@ object TestUtil { * @param df the [[DataFrame]] to be executed * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ - def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = { + def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = { val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty def prepareAnswer(answer: Seq[Row]): Seq[Row] = { // Converts data to types that we can do equality comparison using Scala collections. @@ -136,10 +130,8 @@ object TestUtil { prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n") } """.stripMargin - return Some(errorMessage) + assert(false, errorMessage) } - - return None } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index cd189d2..f335509 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -68,7 +68,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { case IntegerType => CarbonType.INT.getName case ShortType => CarbonType.SHORT.getName case LongType => CarbonType.LONG.getName - case FloatType => CarbonType.DOUBLE.getName + case FloatType => CarbonType.FLOAT.getName case DoubleType => CarbonType.DOUBLE.getName case TimestampType => CarbonType.TIMESTAMP.getName case DateType => CarbonType.DATE.getName http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java index 99b3779..1262fde 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java @@ -388,6 +388,10 @@ public class SortStepRowHandler implements Serializable { tmpContent = rowBuffer.getLong(); } else if (DataTypes.DOUBLE == tmpDataType) { tmpContent = rowBuffer.getDouble(); + } else if (DataTypes.FLOAT == tmpDataType) { + tmpContent = rowBuffer.getFloat(); + } else if (DataTypes.BYTE == tmpDataType) { + tmpContent = rowBuffer.get(); } else if (DataTypes.isDecimal(tmpDataType)) { short len = rowBuffer.getShort(); byte[] decimalBytes = new byte[len]; @@ -829,6 +833,10 @@ public class SortStepRowHandler implements Serializable { reUsableByteArrayDataOutputStream.writeLong((Long) tmpValue); } else if (DataTypes.DOUBLE == tmpDataType) { reUsableByteArrayDataOutputStream.writeDouble((Double) tmpValue); + } else if (DataTypes.FLOAT == tmpDataType) { + reUsableByteArrayDataOutputStream.writeFloat((Float) tmpValue); + } else if (DataTypes.BYTE == tmpDataType) { + reUsableByteArrayDataOutputStream.write((byte) tmpValue); } else if (DataTypes.isDecimal(tmpDataType)) { byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue); reUsableByteArrayDataOutputStream.writeShort((short) decimalBytes.length); http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 02980a7..ab1e154 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -303,7 +303,7 @@ public class AvroCarbonWriter extends CarbonWriter { case FLOAT: // direct conversion will change precision. So parse from string. // also carbon internally needs float as double - out = Double.parseDouble(fieldValue.toString()); + out = Float.parseFloat(fieldValue.toString()); break; case NULL: out = null; @@ -490,7 +490,7 @@ public class AvroCarbonWriter extends CarbonWriter { case STRING: return new Field(fieldName, DataTypes.STRING); case FLOAT: - return new Field(fieldName, DataTypes.DOUBLE); + return new Field(fieldName, DataTypes.FLOAT); case MAP: // recursively get the sub fields ArrayList<StructField> mapSubFields = new ArrayList<>(); @@ -592,7 +592,7 @@ public class AvroCarbonWriter extends CarbonWriter { case STRING: return new StructField(fieldName, DataTypes.STRING); case FLOAT: - return new StructField(fieldName, DataTypes.DOUBLE); + return new StructField(fieldName, DataTypes.FLOAT); case MAP: // recursively get the sub fields ArrayList<StructField> keyValueFields = new ArrayList<>(); @@ -697,7 +697,7 @@ public class AvroCarbonWriter extends CarbonWriter { case STRING: return DataTypes.STRING; case FLOAT: - return DataTypes.DOUBLE; + return DataTypes.FLOAT; case MAP: // recursively get the sub fields StructField mapField = prepareSubFields("val", childSchema); http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java index 8f53125..abaa7f1 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -21,13 +21,19 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -488,4 +494,40 @@ public class AvroCarbonWriterTest { FileUtils.deleteDirectory(new File(path)); } + @Test + public void testWriteBasicForFloat() throws IOException { + FileUtils.deleteDirectory(new File(path)); + + // Avro schema + String avroSchema = + "{" + " \"type\" : \"record\"," + " \"name\" : \"Acme\"," + " \"fields\" : [" + + "{ \"name\" : \"name\", \"type\" : \"string\" }," + + "{ \"name\" : \"age\", \"type\" : \"int\" }," + "{ \"name\" : \"salary\", \"type\" " + + ": \"float\" }]" + "}"; + + String json = "{\"name\":\"bob\", \"age\":10, \"salary\":10.100}"; + + // conversion to GenericData.Record + GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema); + try { + CarbonWriter writer = CarbonWriter.builder().outputPath(path).isTransactionalTable(true) + .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration); + + for (int i = 0; i < 100; i++) { + writer.write(record); + } + writer.close(); + TableInfo tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "", + ""), true, TestUtil.configuration); + List<String> dataTypes = new ArrayList<>(); + for(ColumnSchema columnSchema: tableInfo.getFactTable().getListOfColumns()) { + dataTypes.add(columnSchema.getDataType().toString()); + } + assert(dataTypes.contains("FLOAT")); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 3d59724..1c0bfea 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -21,6 +21,8 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -29,8 +31,13 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.SchemaReader; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.reader.CarbonFooterReaderV3; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -356,4 +363,178 @@ public class CSVCarbonWriterTest { FileUtils.deleteDirectory(new File(path)); } + @Test + public void testFloatDataType() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[3]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("floatField", DataTypes.FLOAT); + fields[2] = new Field("doubleField", DataTypes.DOUBLE); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + for (int i = 0; i < 15; i++) { + String[] row = new String[] { "robot" + (i % 10), String.valueOf(i + "." + i), + String.valueOf(i + "." + i) }; + writer.write(row); + } + writer.close(); + TableInfo tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "", + ""), true, TestUtil.configuration); + List<String> dataTypes = new ArrayList<>(); + for(ColumnSchema columnSchema: tableInfo.getFactTable().getListOfColumns()) { + dataTypes.add(columnSchema.getDataType().toString()); + } + assert(dataTypes.contains("STRING")); + assert(dataTypes.contains("DOUBLE")); + assert(dataTypes.contains("FLOAT")); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + FileUtils.deleteDirectory(new File(path)); + } + } + + @Test + public void testByteDataType() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("byteField", DataTypes.BYTE); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + for (int i = 0; i < 15; i++) { + String[] row = new String[] { "robot" + (i % 10), "" + i }; + writer.write(row); + } + writer.close(); + TableInfo tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "", + ""), true, TestUtil.configuration); + List<String> dataTypes = new ArrayList<>(); + for(ColumnSchema columnSchema: tableInfo.getFactTable().getListOfColumns()) { + dataTypes.add(columnSchema.getDataType().toString()); + } + assert(dataTypes.contains("STRING")); + assert(dataTypes.contains("BYTE")); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + FileUtils.deleteDirectory(new File(path)); + } + } + + @Test + public void testReadingOfByteAndFloatWithCarbonReader() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[3]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("byteField", DataTypes.BYTE); + fields[2] = new Field("floatField", DataTypes.FLOAT); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration); + for (int i = 0; i < 15; i++) { + String[] row = new String[] { "robot" + (i % 10), "" + i, i + "." + i }; + writer.write(row); + } + writer.close(); + CarbonReader carbonReader = + new CarbonReaderBuilder(path, "table1").build(TestUtil.configuration); + for (int i = 0; i < 15; i++) { + Object[] actualRow = (Object[]) carbonReader.readNextRow(); + String[] expectedRow = new String[] { "robot" + (i % 10), "" + i, i + "." + i }; + for (int j = 0; j < 3; j++) { + actualRow[j].toString().equalsIgnoreCase(expectedRow[j]); + } + } + carbonReader.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + FileUtils.deleteDirectory(new File(path)); + } + } + + @Test + public void testWritingAndReadingStructOfFloat() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + StructField[] fields = new StructField[3]; + fields[0] = new StructField("stringField", DataTypes.STRING); + fields[1] = new StructField("byteField", DataTypes.BYTE); + fields[2] = new StructField("floatField", DataTypes.FLOAT); + + Field structType = new Field("structField", "struct", Arrays.asList(fields)); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(new Field[] {structType}), + TestUtil.configuration); + for (int i = 0; i < 15; i++) { + String[] row = new String[] { "robot" + (i % 10)+"$" + i+ "$" + i + "." + i }; + writer.write(row); + } + writer.close(); + //TODO: CarbonReader has a bug which does not allow reading complex. Once it is fixed below validation can be enabled +// CarbonReader carbonReader = +// new CarbonReaderBuilder(path, "table121").projection(new String[]{"structfield"}).build(TestUtil.configuration); +// for (int i = 0; i < 15; i++) { +// Object[] actualRow = (Object[])(carbonReader.readNextRow()); +// String[] expectedRow = new String[] { "robot" + (i % 10), "" + i, i + "." + i }; +// for (int j = 0; j < 3; j++) { +// ((Object[])actualRow[0])[j].toString().equalsIgnoreCase(expectedRow[j]); +// } +// } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + FileUtils.deleteDirectory(new File(path)); + } + } + + @Test + public void testWritingAndReadingArrayOfFloatAndByte() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + StructField[] fields = new StructField[1]; + fields[0] = new StructField("floatField", DataTypes.FLOAT); + + Field structType1 = new Field("floatarray", "array", Arrays.asList(fields)); + StructField[] fields2 = new StructField[1]; + fields2[0] = new StructField("byteField", DataTypes.BYTE); + Field structType2 = new Field("bytearray", "array", Arrays.asList(fields2)); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path); + CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(new Field[] {structType1, structType2}), + TestUtil.configuration); + for (int i = 0; i < 15; i++) { + String[] row = new String[] { "1.0$2.0$3.0", "1$2$3" }; + writer.write(row); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + FileUtils.deleteDirectory(new File(path)); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java index 4cb816b..5c61119 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java @@ -78,7 +78,7 @@ public class ConcurrentAvroSdkWriterTest { CarbonReader reader; try { reader = - CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build(new Configuration(false)); + CarbonReader.builder(path, "_temp2122").projection(new String[] { "name", "age" }).build(new Configuration(false)); int i = 0; while (reader.hasNext()) { Object[] row = (Object[]) reader.readNextRow(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java index 9bb3f29..cdb8b45 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java @@ -71,7 +71,7 @@ public class ConcurrentSdkWriterTest { CarbonReader reader; try { reader = CarbonReader - .builder(path, "_temp") + .builder(path, "_temp1121") .projection(new String[]{"name", "age"}) .build(new Configuration(false)); int i = 0;
