Repository: kudu Updated Branches: refs/heads/master 19c47e659 -> 50931e291
KUDU-721: [Flume] Add DECIMAL type support Adds decimal column support to the Flume KuduSink including the Regex and Avro operations producers. Note: Sets enableDecimalLogicalType to true in the Maven Avro plugin for code generation because the default is false. Change-Id: Ibc02d683dd1bce2cc0de255e4d072436b6c0163a Reviewed-on: http://gerrit.cloudera.org:8080/9365 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0798037c Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0798037c Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0798037c Branch: refs/heads/master Commit: 0798037c32a3b40134e1f45cd6d458df6b3255da Parents: 19c47e6 Author: Grant Henke <[email protected]> Authored: Tue Feb 20 10:49:24 2018 -0600 Committer: Mike Percy <[email protected]> Committed: Wed Feb 21 21:14:48 2018 +0000 ---------------------------------------------------------------------- java/kudu-flume-sink/pom.xml | 3 ++ .../flume/sink/AvroKuduOperationsProducer.java | 31 +++++++++++++++++--- .../sink/RegexpKuduOperationsProducer.java | 26 ++++++++-------- .../avro/testAvroKuduOperationsProducer.avsc | 5 +++- .../sink/AvroKuduOperationsProducerTest.java | 11 +++++-- .../sink/RegexpKuduOperationsProducerTest.java | 11 ++++--- 6 files changed, 64 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/pom.xml ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/pom.xml b/java/kudu-flume-sink/pom.xml index 447aa18..376250c 100644 --- a/java/kudu-flume-sink/pom.xml +++ b/java/kudu-flume-sink/pom.xml @@ -104,6 +104,9 @@ <goals> <goal>schema</goal> </goals> + <configuration> + <enableDecimalLogicalType>true</enableDecimalLogicalType> + </configuration> </execution> </executions> </plugin> http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java index b6241bb..f9a7ddc 100644 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java @@ -21,8 +21,11 @@ package org.apache.kudu.flume.sink; import java.io.IOException; import java.io.InputStream; +import java.math.BigDecimal; +import java.math.BigInteger; import java.net.URI; import java.net.URL; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -34,6 +37,8 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.UncheckedExecutionException; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -46,6 +51,7 @@ import org.apache.flume.FlumeException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.kudu.Type; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -175,7 +181,8 @@ public class AvroKuduOperationsProducer implements KuduOperationsProducer { @Override public List<Operation> getOperations(Event event) throws FlumeException { - DatumReader<GenericRecord> reader = readers.getUnchecked(getSchema(event)); + Schema schema = getSchema(event); + DatumReader<GenericRecord> reader = readers.getUnchecked(schema); decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder); try { reuse = reader.read(reuse, decoder); @@ -193,11 +200,11 @@ public class AvroKuduOperationsProducer implements KuduOperationsProducer { default: throw new FlumeException(String.format("Unexpected operation %s", operation)); } - setupOp(op, reuse); + setupOp(op, schema, reuse); return Collections.singletonList(op); } - private void setupOp(Operation op, GenericRecord record) { + private void setupOp(Operation op, Schema schema, GenericRecord record) { PartialRow row = op.getRow(); for (ColumnSchema col : table.getSchema().getColumns()) { String name = col.getName(); @@ -235,6 +242,9 @@ public class AvroKuduOperationsProducer implements KuduOperationsProducer { case DOUBLE: row.addDouble(name, (double) value); break; + case DECIMAL: + row.addDecimal(name, getAvroBigDecimal(schema, name, value)); + break; case STRING: row.addString(name, value.toString()); break; @@ -249,12 +259,25 @@ public class AvroKuduOperationsProducer implements KuduOperationsProducer { throw new FlumeException( String.format("Failed to coerce value for column '%s' to type %s", col.getName(), - col.getType())); + col.getType()), e); } } } } + private BigDecimal getAvroBigDecimal(Schema schema, String name, Object value) { + LogicalType logicalType = schema.getField(name).schema().getLogicalType(); + if (!(logicalType instanceof LogicalTypes.Decimal)) { + throw new FlumeException(String.format( + "Failed to coerce value for column '%s' to type %s", + name, + Type.DECIMAL)); + } + int scale = ((LogicalTypes.Decimal) logicalType).getScale(); + BigInteger unscaledValue = new BigInteger(((ByteBuffer) value).array()); + return new BigDecimal(unscaledValue, scale); + } + private Schema getSchema(Event event) throws FlumeException { Map<String, String> headers = event.getHeaders(); String schemaUrl = headers.get(SCHEMA_URL_HEADER); http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java index fde8eff..ff9fd3c 100644 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java @@ -19,6 +19,7 @@ package org.apache.kudu.flume.sink; +import java.math.BigDecimal; import java.nio.charset.Charset; import java.util.List; import java.util.regex.Matcher; @@ -253,6 +254,9 @@ public class RegexpKuduOperationsProducer implements KuduOperationsProducer { private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row) throws NumberFormatException { switch (type) { + case BOOL: + row.addBoolean(colName, Boolean.parseBoolean(rawVal)); + break; case INT8: row.addByte(colName, Byte.parseByte(rawVal)); break; @@ -262,26 +266,24 @@ public class RegexpKuduOperationsProducer implements KuduOperationsProducer { case INT32: row.addInt(colName, Integer.parseInt(rawVal)); break; - case INT64: + case INT64: // Fall through + case UNIXTIME_MICROS: row.addLong(colName, Long.parseLong(rawVal)); break; - case BINARY: - row.addBinary(colName, rawVal.getBytes(charset)); - break; - case STRING: - row.addString(colName, rawVal); - break; - case BOOL: - row.addBoolean(colName, Boolean.parseBoolean(rawVal)); - break; case FLOAT: row.addFloat(colName, Float.parseFloat(rawVal)); break; case DOUBLE: row.addDouble(colName, Double.parseDouble(rawVal)); break; - case UNIXTIME_MICROS: - row.addLong(colName, Long.parseLong(rawVal)); + case DECIMAL: + row.addDecimal(colName, new BigDecimal(rawVal)); + break; + case BINARY: + row.addBinary(colName, rawVal.getBytes(charset)); + break; + case STRING: + row.addString(colName, rawVal); break; default: logger.warn("got unknown type {} for column '{}'-- ignoring this column", http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc index b562c3a..6bcf6d2 100644 --- a/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc +++ b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc @@ -6,6 +6,9 @@ {"name": "longField", "type": "long"}, {"name": "doubleField", "type": "double"}, {"name": "nullableField", "type": ["string", "null"]}, - {"name": "stringField", "type": "string"} + {"name": "stringField", "type": "string"}, + {"name": "decimalField", "type": { + "type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 1} + } ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java index c9310b8..21812f8 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -55,6 +56,7 @@ import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.kudu.util.DecimalUtil; import org.junit.BeforeClass; import org.junit.Test; @@ -148,6 +150,8 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", Type.DECIMAL) + .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build()); CreateTableOptions createOptions = new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")) .setNumReplicas(1); @@ -176,6 +180,7 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { record.setDoubleField(2.71828 * i); record.setNullableField(i % 2 == 0 ? null : "taco"); record.setStringField(String.format("hello %d", i)); + record.setDecimalField(BigDecimal.valueOf(i, 1)); ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); DatumWriter<AvroKuduOperationsProducerTestRecord> writer = @@ -198,12 +203,14 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { for (int i = 0; i < eventCount; i++) { answers.add(String.format( "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " + - "STRING nullableField=%s, STRING stringField=hello %s", + "STRING nullableField=%s, STRING stringField=hello %s, " + + "DECIMAL decimalField(9, 1)=%s", 10 * i, 2 * i, 2.71828 * i, i % 2 == 0 ? "NULL" : "taco", - i)); + i, + BigDecimal.valueOf(i, 1))); } Collections.sort(answers); return answers; http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java index d672a31..ece55f7 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java @@ -41,6 +41,7 @@ import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.kudu.util.DecimalUtil; import org.junit.Test; import org.apache.kudu.ColumnSchema; @@ -54,7 +55,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," + "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," + - "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*)"; + "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*),(?<decimalFld>\\d+.\\d*)"; private KuduTable createNewTable(String tableName) throws Exception { ArrayList<ColumnSchema> columns = new ArrayList<>(10); @@ -68,6 +69,8 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { columns.add(new ColumnSchema.ColumnSchemaBuilder("boolFld", Type.BOOL).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("floatFld", Type.FLOAT).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleFld", Type.DOUBLE).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalFld", Type.DECIMAL) + .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build()); CreateTableOptions createOptions = new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1); KuduTable table = createTable(tableName, new Schema(columns), createOptions); @@ -127,7 +130,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { StringBuilder payload = new StringBuilder(); for (int j = 0; j < perEventRowCount; j++) { String baseRow = "|1%1$d%2$d1,%1$d,%1$d,%1$d,%1$d,binary," + - "string,false,%1$d.%1$d,%1$d.%1$d,%1$d|"; + "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|"; String row = String.format(baseRow, i, j); payload.append(row); } @@ -142,7 +145,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { StringBuilder upserts = new StringBuilder(); for (int j = 0; j < perEventRowCount; j++) { String row = String.format("|1%2$d%3$d1,%1$d,%1$d,%1$d,%1$d,binary," + - "string,false,%1$d.%1$d,%1$d.%1$d,%1$d|", 1, 0, j); + "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|", 1, 0, j); upserts.append(row); } Event e = EventBuilder.withBody(upserts.toString().getBytes()); @@ -181,7 +184,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " + "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " + "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " + - "DOUBLE doubleFld=%1$d.%1$d"; + "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d"; String rightAnswer = String.format(baseAnswer, value, i, j); rightAnswers.add(rightAnswer); }
