Repository: kudu
Updated Branches:
  refs/heads/master 19c47e659 -> 50931e291


KUDU-721: [Flume] Add DECIMAL type support

Adds decimal column support to the Flume KuduSink
including the Regex and Avro operations producers.

Note: Sets enableDecimalLogicalType to true in the
Maven Avro plugin for code generation because the
default is false.

Change-Id: Ibc02d683dd1bce2cc0de255e4d072436b6c0163a
Reviewed-on: http://gerrit.cloudera.org:8080/9365
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0798037c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0798037c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0798037c

Branch: refs/heads/master
Commit: 0798037c32a3b40134e1f45cd6d458df6b3255da
Parents: 19c47e6
Author: Grant Henke <[email protected]>
Authored: Tue Feb 20 10:49:24 2018 -0600
Committer: Mike Percy <[email protected]>
Committed: Wed Feb 21 21:14:48 2018 +0000

----------------------------------------------------------------------
 java/kudu-flume-sink/pom.xml                    |  3 ++
 .../flume/sink/AvroKuduOperationsProducer.java  | 31 +++++++++++++++++---
 .../sink/RegexpKuduOperationsProducer.java      | 26 ++++++++--------
 .../avro/testAvroKuduOperationsProducer.avsc    |  5 +++-
 .../sink/AvroKuduOperationsProducerTest.java    | 11 +++++--
 .../sink/RegexpKuduOperationsProducerTest.java  | 11 ++++---
 6 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/pom.xml b/java/kudu-flume-sink/pom.xml
index 447aa18..376250c 100644
--- a/java/kudu-flume-sink/pom.xml
+++ b/java/kudu-flume-sink/pom.xml
@@ -104,6 +104,9 @@
               <goals>
                 <goal>schema</goal>
               </goals>
+              <configuration>
+                <enableDecimalLogicalType>true</enableDecimalLogicalType>
+              </configuration>
           </execution>
         </executions>
       </plugin>

http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
 
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
index b6241bb..f9a7ddc 100644
--- 
a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
+++ 
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
@@ -21,8 +21,11 @@ package org.apache.kudu.flume.sink;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.net.URI;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -34,6 +37,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -46,6 +51,7 @@ import org.apache.flume.FlumeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kudu.Type;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -175,7 +181,8 @@ public class AvroKuduOperationsProducer implements 
KuduOperationsProducer {
 
   @Override
   public List<Operation> getOperations(Event event) throws FlumeException {
-    DatumReader<GenericRecord> reader = readers.getUnchecked(getSchema(event));
+    Schema schema = getSchema(event);
+    DatumReader<GenericRecord> reader = readers.getUnchecked(schema);
     decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
     try {
       reuse = reader.read(reuse, decoder);
@@ -193,11 +200,11 @@ public class AvroKuduOperationsProducer implements 
KuduOperationsProducer {
       default:
         throw new FlumeException(String.format("Unexpected operation %s", 
operation));
     }
-    setupOp(op, reuse);
+    setupOp(op, schema, reuse);
     return Collections.singletonList(op);
   }
 
-  private void setupOp(Operation op, GenericRecord record) {
+  private void setupOp(Operation op, Schema schema, GenericRecord record) {
     PartialRow row = op.getRow();
     for (ColumnSchema col : table.getSchema().getColumns()) {
       String name = col.getName();
@@ -235,6 +242,9 @@ public class AvroKuduOperationsProducer implements 
KuduOperationsProducer {
             case DOUBLE:
               row.addDouble(name, (double) value);
               break;
+            case DECIMAL:
+              row.addDecimal(name, getAvroBigDecimal(schema, name, value));
+              break;
             case STRING:
               row.addString(name, value.toString());
               break;
@@ -249,12 +259,25 @@ public class AvroKuduOperationsProducer implements 
KuduOperationsProducer {
           throw new FlumeException(
               String.format("Failed to coerce value for column '%s' to type 
%s",
               col.getName(),
-              col.getType()));
+              col.getType()), e);
         }
       }
     }
   }
 
+  private BigDecimal getAvroBigDecimal(Schema schema, String name, Object 
value) {
+    LogicalType logicalType = schema.getField(name).schema().getLogicalType();
+    if (!(logicalType instanceof LogicalTypes.Decimal)) {
+      throw new FlumeException(String.format(
+          "Failed to coerce value for column '%s' to type %s",
+          name,
+          Type.DECIMAL));
+    }
+    int scale = ((LogicalTypes.Decimal) logicalType).getScale();
+    BigInteger unscaledValue = new BigInteger(((ByteBuffer) value).array());
+    return new BigDecimal(unscaledValue, scale);
+  }
+
   private Schema getSchema(Event event) throws FlumeException {
     Map<String, String> headers = event.getHeaders();
     String schemaUrl = headers.get(SCHEMA_URL_HEADER);

http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
 
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
index fde8eff..ff9fd3c 100644
--- 
a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
+++ 
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
@@ -19,6 +19,7 @@
 
 package org.apache.kudu.flume.sink;
 
+import java.math.BigDecimal;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.regex.Matcher;
@@ -253,6 +254,9 @@ public class RegexpKuduOperationsProducer implements 
KuduOperationsProducer {
   private void coerceAndSet(String rawVal, String colName, Type type, 
PartialRow row)
       throws NumberFormatException {
     switch (type) {
+      case BOOL:
+        row.addBoolean(colName, Boolean.parseBoolean(rawVal));
+        break;
       case INT8:
         row.addByte(colName, Byte.parseByte(rawVal));
         break;
@@ -262,26 +266,24 @@ public class RegexpKuduOperationsProducer implements 
KuduOperationsProducer {
       case INT32:
         row.addInt(colName, Integer.parseInt(rawVal));
         break;
-      case INT64:
+      case INT64: // Fall through
+      case UNIXTIME_MICROS:
         row.addLong(colName, Long.parseLong(rawVal));
         break;
-      case BINARY:
-        row.addBinary(colName, rawVal.getBytes(charset));
-        break;
-      case STRING:
-        row.addString(colName, rawVal);
-        break;
-      case BOOL:
-        row.addBoolean(colName, Boolean.parseBoolean(rawVal));
-        break;
       case FLOAT:
         row.addFloat(colName, Float.parseFloat(rawVal));
         break;
       case DOUBLE:
         row.addDouble(colName, Double.parseDouble(rawVal));
         break;
-      case UNIXTIME_MICROS:
-        row.addLong(colName, Long.parseLong(rawVal));
+      case DECIMAL:
+        row.addDecimal(colName, new BigDecimal(rawVal));
+        break;
+      case BINARY:
+        row.addBinary(colName, rawVal.getBytes(charset));
+        break;
+      case STRING:
+        row.addString(colName, rawVal);
         break;
       default:
         logger.warn("got unknown type {} for column '{}'-- ignoring this 
column",

http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc
----------------------------------------------------------------------
diff --git 
a/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc 
b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc
index b562c3a..6bcf6d2 100644
--- a/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc
+++ b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc
@@ -6,6 +6,9 @@
     {"name": "longField",  "type": "long"},
     {"name": "doubleField",  "type": "double"},
     {"name": "nullableField",  "type": ["string", "null"]},
-    {"name": "stringField", "type": "string"}
+    {"name": "stringField", "type": "string"},
+    {"name": "decimalField", "type": {
+        "type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 1}
+    }
   ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
 
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
index c9310b8..21812f8 100644
--- 
a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
+++ 
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertEquals;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,6 +56,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.util.DecimalUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -148,6 +150,8 @@ public class AvroKuduOperationsProducerTest extends 
BaseKuduTest {
     columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", 
Type.DOUBLE).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", 
Type.STRING).nullable(true).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", 
Type.STRING).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", 
Type.DECIMAL)
+        .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
     CreateTableOptions createOptions =
         new 
CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
             .setNumReplicas(1);
@@ -176,6 +180,7 @@ public class AvroKuduOperationsProducerTest extends 
BaseKuduTest {
       record.setDoubleField(2.71828 * i);
       record.setNullableField(i % 2 == 0 ? null : "taco");
       record.setStringField(String.format("hello %d", i));
+      record.setDecimalField(BigDecimal.valueOf(i, 1));
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
       DatumWriter<AvroKuduOperationsProducerTestRecord> writer =
@@ -198,12 +203,14 @@ public class AvroKuduOperationsProducerTest extends 
BaseKuduTest {
     for (int i = 0; i < eventCount; i++) {
       answers.add(String.format(
           "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
-              "STRING nullableField=%s, STRING stringField=hello %s",
+              "STRING nullableField=%s, STRING stringField=hello %s, " +
+              "DECIMAL decimalField(9, 1)=%s",
           10 * i,
           2 * i,
           2.71828 * i,
           i % 2 == 0 ? "NULL" : "taco",
-          i));
+          i,
+          BigDecimal.valueOf(i, 1)));
     }
     Collections.sort(answers);
     return answers;

http://git-wip-us.apache.org/repos/asf/kudu/blob/0798037c/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
 
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
index d672a31..ece55f7 100644
--- 
a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
+++ 
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
@@ -41,6 +41,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.util.DecimalUtil;
 import org.junit.Test;
 
 import org.apache.kudu.ColumnSchema;
@@ -54,7 +55,7 @@ public class RegexpKuduOperationsProducerTest extends 
BaseKuduTest {
   private static final String TEST_REGEXP =
       "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," +
       
"(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," +
-      "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*)";
+      
"(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*),(?<decimalFld>\\d+.\\d*)";
 
   private KuduTable createNewTable(String tableName) throws Exception {
     ArrayList<ColumnSchema> columns = new ArrayList<>(10);
@@ -68,6 +69,8 @@ public class RegexpKuduOperationsProducerTest extends 
BaseKuduTest {
     columns.add(new ColumnSchema.ColumnSchemaBuilder("boolFld", 
Type.BOOL).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("floatFld", 
Type.FLOAT).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleFld", 
Type.DOUBLE).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalFld", 
Type.DECIMAL)
+        .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
     CreateTableOptions createOptions =
         new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
3).setNumReplicas(1);
     KuduTable table = createTable(tableName, new Schema(columns), 
createOptions);
@@ -127,7 +130,7 @@ public class RegexpKuduOperationsProducerTest extends 
BaseKuduTest {
       StringBuilder payload = new StringBuilder();
       for (int j = 0; j < perEventRowCount; j++) {
         String baseRow = "|1%1$d%2$d1,%1$d,%1$d,%1$d,%1$d,binary," +
-            "string,false,%1$d.%1$d,%1$d.%1$d,%1$d|";
+            "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|";
         String row = String.format(baseRow, i, j);
         payload.append(row);
       }
@@ -142,7 +145,7 @@ public class RegexpKuduOperationsProducerTest extends 
BaseKuduTest {
         StringBuilder upserts = new StringBuilder();
         for (int j = 0; j < perEventRowCount; j++) {
           String row = String.format("|1%2$d%3$d1,%1$d,%1$d,%1$d,%1$d,binary," 
+
-              "string,false,%1$d.%1$d,%1$d.%1$d,%1$d|", 1, 0, j);
+              "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|", 1, 0, j);
           upserts.append(row);
         }
         Event e = EventBuilder.withBody(upserts.toString().getBytes());
@@ -181,7 +184,7 @@ public class RegexpKuduOperationsProducerTest extends 
BaseKuduTest {
         String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 
shortFld=%1$d, " +
             "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY 
binaryFld=\"binary\", " +
             "STRING stringFld=string, BOOL boolFld=false, FLOAT 
floatFld=%1$d.%1$d, " +
-            "DOUBLE doubleFld=%1$d.%1$d";
+            "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d";
         String rightAnswer = String.format(baseAnswer, value, i, j);
         rightAnswers.add(rightAnswer);
       }

Reply via email to