This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 642159c8866ac13246ee112964ad79f4b0c7cf9e Author: Baodi Shi <[email protected]> AuthorDate: Thu Apr 21 17:26:35 2022 +0800 Pulsar SQL support for Decimal data type (#15153) (cherry picked from commit 6b004ed6a2554ab826a00aa2a177963de3c5f44b) --- .../presto/decoder/avro/PulsarAvroColumnDecoder.java | 19 ++++++++++++++++++- .../decoder/avro/PulsarAvroRowDecoderFactory.java | 10 +++++++++- .../decoder/json/PulsarJsonRowDecoderFactory.java | 6 ++++++ .../pulsar/sql/presto/TestPulsarConnector.java | 8 +++++++- .../pulsar/sql/presto/TestPulsarRecordCursor.java | 15 +++++++++++++++ .../sql/presto/decoder/AbstractDecoderTester.java | 5 +++++ .../sql/presto/decoder/DecoderTestMessage.java | 6 +++++- .../pulsar/sql/presto/decoder/DecoderTestUtil.java | 20 ++++++++++++++++++++ .../sql/presto/decoder/avro/TestAvroDecoder.java | 11 +++++++++++ 9 files changed, 96 insertions(+), 4 deletions(-) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java index 690daf62d2e..0c57336d213 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java @@ -40,6 +40,8 @@ import io.prestosql.spi.type.ArrayType; import io.prestosql.spi.type.BigintType; import io.prestosql.spi.type.BooleanType; import io.prestosql.spi.type.DateType; +import io.prestosql.spi.type.DecimalType; +import io.prestosql.spi.type.Decimals; import io.prestosql.spi.type.DoubleType; import io.prestosql.spi.type.IntegerType; import io.prestosql.spi.type.MapType; @@ -53,6 +55,7 @@ import io.prestosql.spi.type.TinyintType; import io.prestosql.spi.type.Type; import io.prestosql.spi.type.VarbinaryType; import io.prestosql.spi.type.VarcharType; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -139,7 +142,7 @@ public class PulsarAvroColumnDecoder { } private boolean isSupportedPrimitive(Type type) { - return type instanceof VarcharType || SUPPORTED_PRIMITIVE_TYPES.contains(type); + return type instanceof VarcharType || type instanceof DecimalType || SUPPORTED_PRIMITIVE_TYPES.contains(type); } public FieldValueProvider decodeField(GenericRecord avroRecord) { @@ -205,6 +208,13 @@ public class PulsarAvroColumnDecoder { return floatToIntBits((Float) value); } + if (columnType instanceof DecimalType) { + ByteBuffer buffer = (ByteBuffer) value; + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return new BigInteger(bytes).longValue(); + } + throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), columnType, columnName)); @@ -234,6 +244,13 @@ public class PulsarAvroColumnDecoder { } } + // The returned Slice size must be equals to 18 Byte + if (type instanceof DecimalType) { + ByteBuffer buffer = (ByteBuffer) value; + BigInteger bigInteger = new BigInteger(buffer.array()); + return Decimals.encodeUnscaledValue(bigInteger); + } + throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName)); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java index 12352059c2d..74b0a88fcef 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java @@ -33,6 +33,7 @@ import io.prestosql.spi.connector.ColumnMetadata; import io.prestosql.spi.type.ArrayType; import io.prestosql.spi.type.BigintType; import io.prestosql.spi.type.BooleanType; +import io.prestosql.spi.type.DecimalType; import io.prestosql.spi.type.DoubleType; import io.prestosql.spi.type.IntegerType; import io.prestosql.spi.type.RealType; @@ -128,7 +129,14 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory { + "please check the schema or report the bug.", fieldname)); case FIXED: case BYTES: - //TODO: support decimal logicalType + // When the precision <= 0, throw Exception. + // When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long + // When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice + // When the precision > 36, throw Exception. + if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale()); + } return VarbinaryType.VARBINARY; case INT: if (logicalType == LogicalTypes.timeMillis()) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java index 330631e72a8..bb064d8909f 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java @@ -128,6 +128,12 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory { + "please check the schema or report the bug.", fieldname)); case FIXED: case BYTES: + // In the current implementation, since JsonSchema is generated by Avro, + // there may exist LogicalTypes.Decimal. + // Mapping decimalType with varcharType in JsonSchema. + if (logicalType instanceof LogicalTypes.Decimal) { + return createUnboundedVarcharType(); + } return VarbinaryType.VARBINARY; case INT: if (logicalType == LogicalTypes.timeMillis()) { diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index b673fc368e6..7db32f59148 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -25,6 +25,7 @@ import io.prestosql.spi.connector.ColumnMetadata; import io.prestosql.spi.connector.ConnectorContext; import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.testing.TestingConnectorContext; +import java.math.BigDecimal; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -166,6 +167,8 @@ public abstract class TestPulsarConnector { public int time; @org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"date\" }") public int date; + @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }") + public BigDecimal decimal; public TestPulsarConnector.Bar bar; public TestEnum field7; } @@ -253,6 +256,7 @@ public abstract class TestPulsarConnector { fooFieldNames.add("date"); fooFieldNames.add("bar"); fooFieldNames.add("field7"); + fooFieldNames.add("decimal"); ConnectorContext prestoConnectorContext = new TestingConnectorContext(); @@ -313,6 +317,7 @@ public abstract class TestPulsarConnector { LocalDate epoch = LocalDate.ofEpochDay(0); return Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate)); }); + fooFunctions.put("decimal", integer -> BigDecimal.valueOf(1234, 2)); fooFunctions.put("bar.field1", integer -> integer % 3 == 0 ? null : integer + 1); fooFunctions.put("bar.field2", integer -> integer % 2 == 0 ? null : String.valueOf(integer + 2)); fooFunctions.put("bar.field3", integer -> integer + 3.0f); @@ -331,7 +336,6 @@ public abstract class TestPulsarConnector { * @param schemaInfo * @param handleKeyValueType * @param includeInternalColumn - * @param dispatchingRowDecoderFactory * @return */ protected static List<PulsarColumnHandle> getColumnColumnHandles(TopicName topicName, SchemaInfo schemaInfo, @@ -393,6 +397,7 @@ public abstract class TestPulsarConnector { LocalDate localDate = LocalDate.now(); LocalDate epoch = LocalDate.ofEpochDay(0); foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate)); + foo.decimal= BigDecimal.valueOf(count, 2); MessageMetadata messageMetadata = new MessageMetadata() .setProducerName("test-producer").setSequenceId(i) @@ -609,6 +614,7 @@ public abstract class TestPulsarConnector { foo.timestamp = (long) fooFunctions.get("timestamp").apply(count); foo.time = (int) fooFunctions.get("time").apply(count); foo.date = (int) fooFunctions.get("date").apply(count); + foo.decimal = (BigDecimal) fooFunctions.get("decimal").apply(count); foo.bar = bar; foo.field7 = (Foo.TestEnum) fooFunctions.get("field7").apply(count); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java index dbde648ee95..23dc69245f0 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java @@ -22,7 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.airlift.log.Logger; import io.netty.buffer.ByteBuf; import io.prestosql.spi.predicate.TupleDomain; +import io.prestosql.spi.type.DecimalType; import io.prestosql.spi.type.RowType; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.VarcharType; +import java.math.BigDecimal; import lombok.Data; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -142,6 +146,17 @@ public class TestPulsarRecordCursor extends TestPulsarConnector { }else if (fooColumnHandles.get(i).getName().equals("field7")) { assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes()); columnsSeen.add(fooColumnHandles.get(i).getName()); + }else if (fooColumnHandles.get(i).getName().equals("decimal")) { + Type type = fooColumnHandles.get(i).getType(); + // In JsonDecoder, decimal trans to varcharType + if (type instanceof VarcharType) { + assertEquals(new String(pulsarRecordCursor.getSlice(i).getBytes()), + fooFunctions.get("decimal").apply(count).toString()); + } else { + DecimalType decimalType = (DecimalType) fooColumnHandles.get(i).getType(); + assertEquals(BigDecimal.valueOf(pulsarRecordCursor.getLong(i), decimalType.getScale()), fooFunctions.get("decimal").apply(count)); + } + columnsSeen.add(fooColumnHandles.get(i).getName()); } else { if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) { columnsSeen.add(fooColumnHandles.get(i).getName()); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java index 5cd46832516..e5ceb321aae 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java @@ -26,6 +26,7 @@ import io.prestosql.spi.connector.ColumnMetadata; import io.prestosql.spi.connector.ConnectorContext; import io.prestosql.spi.type.Type; import io.prestosql.testing.TestingConnectorContext; +import java.math.BigDecimal; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.SchemaInfo; @@ -102,6 +103,10 @@ public abstract class AbstractDecoderTester { decoderTestUtil.checkValue(decodedRow, handle, value); } + protected void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) { + decoderTestUtil.checkValue(decodedRow, handle, value); + } + protected Block getBlock(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) { FieldValueProvider provider = decodedRow.get(handle); assertNotNull(provider); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java index 115f3691c00..da6d92e5158 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.sql.presto.decoder; +import java.math.BigDecimal; import lombok.Data; import java.util.List; @@ -45,6 +46,10 @@ public class DecoderTestMessage { public int dateField; public TestRow rowField; public TestEnum enumField; + @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }") + public BigDecimal decimalField; + @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 30, \"scale\": 2 }") + public BigDecimal longDecimalField; public List<String> arrayField; public Map<String, Long> mapField; @@ -62,7 +67,6 @@ public class DecoderTestMessage { public long longField; } - public static class CompositeRow { public String stringField; public List<NestedRow> arrayField; diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java index 4c3c4a63447..496a6f061bf 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java @@ -23,11 +23,16 @@ import io.prestosql.decoder.DecoderColumnHandle; import io.prestosql.decoder.FieldValueProvider; import io.prestosql.spi.block.Block; import io.prestosql.spi.type.ArrayType; +import io.prestosql.spi.type.DecimalType; +import io.prestosql.spi.type.Decimals; import io.prestosql.spi.type.MapType; import io.prestosql.spi.type.RowType; import io.prestosql.spi.type.Type; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.Map; +import static io.prestosql.spi.type.UnscaledDecimal128Arithmetic.UNSCALED_DECIMAL_128_SLICE_LENGTH; import static io.prestosql.testing.TestingConnectorSession.SESSION; import static org.testng.Assert.*; @@ -113,6 +118,21 @@ public abstract class DecoderTestUtil { assertEquals(provider.getBoolean(), value); } + public void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) { + FieldValueProvider provider = decodedRow.get(handle); + DecimalType decimalType = (DecimalType) handle.getType(); + BigDecimal actualDecimal; + if (decimalType.getFixedSize() == UNSCALED_DECIMAL_128_SLICE_LENGTH) { + Slice slice = provider.getSlice(); + BigInteger bigInteger = Decimals.decodeUnscaledValue(slice); + actualDecimal = new BigDecimal(bigInteger, decimalType.getScale()); + } else { + actualDecimal = BigDecimal.valueOf(provider.getLong(), decimalType.getScale()); + } + assertNotNull(provider); + assertEquals(actualDecimal, value); + } + public void checkIsNull(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) { FieldValueProvider provider = decodedRow.get(handle); assertNotNull(provider); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java index 1cfbbb4fce5..7b270c7995b 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java @@ -25,11 +25,13 @@ import io.prestosql.decoder.FieldValueProvider; import io.prestosql.spi.PrestoException; import io.prestosql.spi.type.ArrayType; import io.prestosql.spi.type.BigintType; +import io.prestosql.spi.type.DecimalType; import io.prestosql.spi.type.RowType; import io.prestosql.spi.type.StandardTypes; import io.prestosql.spi.type.Type; import io.prestosql.spi.type.TypeSignatureParameter; import io.prestosql.spi.type.VarcharType; +import java.math.BigDecimal; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -87,6 +89,8 @@ public class TestAvroDecoder extends AbstractDecoderTester { message.longField = 222L; message.timestampField = System.currentTimeMillis(); message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1; + message.decimalField = BigDecimal.valueOf(2233, 2); + message.longDecimalField = new BigDecimal("1234567891234567891234567891.23"); LocalTime now = LocalTime.now(ZoneId.systemDefault()); message.timeField = now.toSecondOfDay() * 1000; @@ -127,6 +131,13 @@ public class TestAvroDecoder extends AbstractDecoderTester { "enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString()); + PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(), + "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); + checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField); + + PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(), + "longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); + checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField); } @Test
