ahmedabu98 commented on code in PR #27329:
URL: https://github.com/apache/beam/pull/27329#discussion_r1281001400
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -1022,9 +1046,23 @@ private static org.apache.avro.Schema getFieldSchema(
break;
case DECIMAL:
- baseType =
- LogicalTypes.decimal(Integer.MAX_VALUE)
- .addToSchema(org.apache.avro.Schema.create(Type.BYTES));
+ // Beam avro decimal defaults: type=bytes, precision=MAX_INT, scale=0
+ String type = options.hasOption("type") ? options.getValue("type") :
"bytes";
+ Type decimalType = Type.valueOf(type.toUpperCase());
+ Integer precision =
+ options.hasOption("precision") ? options.getValue("precision")
: Integer.MAX_VALUE;
+ Integer scale = options.hasOption("scale") ? options.getValue("scale")
: 0;
+ LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, scale);
+ if (decimalType == Type.FIXED) {
+ Integer size = options.getValue("size");
Review Comment:
Might be worth making a `hasOption` check here too
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -1156,7 +1194,13 @@ private static org.apache.avro.Schema getFieldSchema(
case DECIMAL:
BigDecimal decimal = (BigDecimal) value;
LogicalType logicalType = typeWithNullability.type.getLogicalType();
- return new Conversions.DecimalConversion().toBytes(decimal, null,
logicalType);
+ if (typeWithNullability.type.getType() == Type.FIXED) {
+ return new Conversions.DecimalConversion()
+ .toFixed(decimal, typeWithNullability.type, logicalType);
+ } else {
+ // typeWithNullability.type.getType() == Type.BYTES
+ return new Conversions.DecimalConversion().toBytes(decimal, null,
logicalType);
+ }
Review Comment:
Also suggest throwing a runtime exception for unexpected type here
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -1283,11 +1327,20 @@ private static org.apache.avro.Schema getFieldSchema(
LogicalType logicalType = LogicalTypes.fromSchema(type.type);
if (logicalType != null) {
if (logicalType instanceof LogicalTypes.Decimal) {
- ByteBuffer byteBuffer = (ByteBuffer) value;
- BigDecimal bigDecimal =
- new Conversions.DecimalConversion()
- .fromBytes(byteBuffer.duplicate(), type.type, logicalType);
- return convertDecimal(bigDecimal, fieldType);
+ if (avroSchema.getType() == Type.FIXED) {
+ GenericFixed genericFixed = (GenericFixed) value;
+ BigDecimal bigDecimal =
+ new Conversions.DecimalConversion().fromFixed(genericFixed,
type.type, logicalType);
+ return convertDecimal(bigDecimal, fieldType);
+ } else {
+ // avroSchema.getType() == Type.BYTES
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ BigDecimal bigDecimal =
+ new Conversions.DecimalConversion()
+ .fromBytes(byteBuffer.duplicate(), type.type,
logicalType);
+ return convertDecimal(bigDecimal, fieldType);
+ }
Review Comment:
Runtime exception for unexpected type
##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java:
##########
@@ -246,10 +247,17 @@ private static org.apache.avro.Schema getAvroSchema() {
"bytes", org.apache.avro.Schema.create(Type.BYTES), "", (Object)
null));
fields.add(
new org.apache.avro.Schema.Field(
- "decimal",
- LogicalTypes.decimal(Integer.MAX_VALUE)
- .addToSchema(org.apache.avro.Schema.create(Type.BYTES)),
- "",
+ "decimalBytes",
Review Comment:
We should keep the original test case to make sure these changes don't break
backwards compatibility
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -1022,9 +1046,23 @@ private static org.apache.avro.Schema getFieldSchema(
break;
case DECIMAL:
- baseType =
- LogicalTypes.decimal(Integer.MAX_VALUE)
- .addToSchema(org.apache.avro.Schema.create(Type.BYTES));
+ // Beam avro decimal defaults: type=bytes, precision=MAX_INT, scale=0
+ String type = options.hasOption("type") ? options.getValue("type") :
"bytes";
+ Type decimalType = Type.valueOf(type.toUpperCase());
+ Integer precision =
+ options.hasOption("precision") ? options.getValue("precision")
: Integer.MAX_VALUE;
+ Integer scale = options.hasOption("scale") ? options.getValue("scale")
: 0;
+ LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, scale);
+ if (decimalType == Type.FIXED) {
+ Integer size = options.getValue("size");
+ baseType =
+ decimal.addToSchema(
+ org.apache.avro.Schema.createFixed(fieldName, null,
namespace, size));
+ } else {
+ // decimalType == Type.BYTES
+ baseType =
decimal.addToSchema(org.apache.avro.Schema.create(Type.BYTES));
+ }
Review Comment:
```suggestion
} else if (decimalType == Type.BYTES) {
// decimalType == Type.BYTES
baseType =
decimal.addToSchema(org.apache.avro.Schema.create(Type.BYTES));
} else {
// throw RuntimeException for unexpected type
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]