rdblue commented on code in PR #12470:
URL: https://github.com/apache/iceberg/pull/12470#discussion_r1984040406


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java:
##########
@@ -134,43 +145,15 @@ private ParquetValueWriter<?> newOption(Type fieldType, 
ParquetValueWriter<?> wr
     public ParquetValueWriter<?> primitive(LogicalType fType, PrimitiveType 
primitive) {
       ColumnDescriptor desc = type.getColumnDescription(currentPath());
 
-      if (primitive.getOriginalType() != null) {
-        switch (primitive.getOriginalType()) {
-          case ENUM:
-          case JSON:
-          case UTF8:
-            return strings(desc);
-          case DATE:
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            return ints(fType, desc);
-          case INT_64:
-            return ParquetValueWriters.longs(desc);
-          case TIME_MICROS:
-            return timeMicros(desc);
-          case TIMESTAMP_MICROS:
-            return timestamps(desc);
-          case DECIMAL:
-            DecimalLogicalTypeAnnotation decimal =
-                (DecimalLogicalTypeAnnotation) 
primitive.getLogicalTypeAnnotation();
-            switch (primitive.getPrimitiveTypeName()) {
-              case INT32:
-                return decimalAsInteger(desc, decimal.getPrecision(), 
decimal.getScale());
-              case INT64:
-                return decimalAsLong(desc, decimal.getPrecision(), 
decimal.getScale());
-              case BINARY:
-              case FIXED_LEN_BYTE_ARRAY:
-                return decimalAsFixed(desc, decimal.getPrecision(), 
decimal.getScale());
-              default:
-                throw new UnsupportedOperationException(
-                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
-            }
-          case BSON:
-            return byteArrays(desc);
-          default:
-            throw new UnsupportedOperationException(
-                "Unsupported logical type: " + primitive.getOriginalType());
+      LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation();
+      if (annotation != null) {
+        Optional<ParquetValueWriter<?>> writer =
+            annotation.accept(new LogicalTypeWriterBuilder(fType, desc));

Review Comment:
   Updated this to use the logical annotation visitor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to