Copilot commented on code in PR #9596:
URL: https://github.com/apache/seatunnel/pull/9596#discussion_r2218409105
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java:
##########
@@ -185,14 +189,31 @@ private Object resolveObject(Object field,
SeaTunnelDataType<?> fieldType) {
resolveObject(value, valueType)));
return dataMap;
case BOOLEAN:
+ return Boolean.parseBoolean(field.toString());
Review Comment:
Boolean.parseBoolean() only returns true for the string "true"
(case-insensitive), but Parquet boolean fields are typically already Boolean
objects. This could cause incorrect conversion of true boolean values to false
if they're not string representations.
```suggestion
if (field instanceof Boolean) {
return field;
} else {
String errorMsg = String.format("Expected a Boolean type
but got [%s]", field.getClass().getName());
throw new
FileConnectorException(CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
}
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java:
##########
@@ -185,14 +189,31 @@ private Object resolveObject(Object field,
SeaTunnelDataType<?> fieldType) {
resolveObject(value, valueType)));
return dataMap;
case BOOLEAN:
+ return Boolean.parseBoolean(field.toString());
case INT:
Review Comment:
Integer.parseInt() will throw NumberFormatException if the field is already
an Integer object, as toString() on Integer objects returns valid integer
strings, but this unnecessary conversion could fail for edge cases or if field
is null.
```suggestion
case INT:
if (field instanceof Integer) {
return field;
}
if (field == null) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Field is null and cannot be converted to
Integer.");
}
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java:
##########
@@ -309,26 +399,91 @@ public static void generateTestData() throws IOException {
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
+ // create first record
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", 1);
record1.put("name", "Alice");
record1.put("salary", 50000.0);
+ record1.put("age", 30);
+ record1.put("active", true);
+ record1.put("score", 98.5f);
+ record1.put("created_at", System.currentTimeMillis());
+
+ // Date type
+ record1.put("join_date", 20289);
+
+ // Decimal type
+ BigDecimal budget = new BigDecimal("1198.02");
+ Schema.Field budgetField = schema.getField("budget");
+ Schema budgetSchema = budgetField.schema();
+ Conversions.DecimalConversion decimalConversion = new
Conversions.DecimalConversion();
+ GenericFixed budgetFixed =
+ decimalConversion.toFixed(budget, budgetSchema,
budgetSchema.getLogicalType());
+ record1.put("budget", budgetFixed);
+
+ // Array type
GenericArray<Utf8> skills1 =
new GenericData.Array<>(2,
schema.getField("skills").schema());
skills1.add(new Utf8("Java"));
skills1.add(new Utf8("Python"));
record1.put("skills", skills1);
+
+ // Map type
+ Map<Utf8, Utf8> properties1 = new HashMap<>();
+ properties1.put(new Utf8("department"), new Utf8("Engineering"));
+ properties1.put(new Utf8("location"), new Utf8("Beijing"));
+ record1.put("properties", properties1);
+
+ // Binary type
+ record1.put(
+ "binary_data",
+ ByteBuffer.wrap("binary data
example".getBytes(StandardCharsets.UTF_8)));
+ record1.put(
+ "binary_as_string",
+
ByteBuffer.wrap("binary_as_string".getBytes(StandardCharsets.UTF_8)));
+
writer.write(record1);
+ // create second record
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", 2);
record2.put("name", "Bob");
record2.put("salary", 60000.0);
+ record2.put("age", 35);
+ record2.put("active", false);
+ record2.put("score", 89.2f);
Review Comment:
Same issue as line 408 - putting a float value into a field defined as
double type in the schema.
```suggestion
record2.put("score", 89.2);
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java:
##########
@@ -420,4 +479,24 @@ boolean checkFileType(String path) {
throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
}
}
+
+ private SeaTunnelDataType<?> getFinalType(
+ SeaTunnelDataType<?> fileType, SeaTunnelDataType<?> configType) {
+ if (configType == null) {
+ return fileType;
+ }
+ return canConvert(fileType, configType) ? configType : fileType;
+ }
+
+ private SeaTunnelDataType<?> getConfigFieldType(
+ SeaTunnelRowType configRowType, String fieldName) {
+
+ if (configRowType == null) {
+ return null;
+ }
+
+ int fieldIndex =
Arrays.asList(configRowType.getFieldNames()).indexOf(fieldName);
+
+ return fieldIndex == -1 ? null :
configRowType.getFieldType(fieldIndex);
Review Comment:
Arrays.asList(configRowType.getFieldNames()).indexOf(fieldName) creates a
new ArrayList and performs linear search for each field lookup. Consider using
a HashMap to cache field name to index mappings for better performance when
processing multiple fields.
```suggestion
initializeFieldNameToIndexCache(configRowType);
Integer fieldIndex = fieldNameToIndexCache.get(fieldName);
return fieldIndex == null ? null :
configRowType.getFieldType(fieldIndex);
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java:
##########
@@ -309,26 +399,91 @@ public static void generateTestData() throws IOException {
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
+ // create first record
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", 1);
record1.put("name", "Alice");
record1.put("salary", 50000.0);
+ record1.put("age", 30);
+ record1.put("active", true);
+ record1.put("score", 98.5f);
Review Comment:
The test data puts a float value (98.5f) into the 'score' field, but the
schema defines it as 'double' type. This type mismatch could cause issues in
the Parquet writer.
```suggestion
record1.put("score", 98.5);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]