fsk119 commented on a change in pull request #16056:
URL: https://github.com/apache/flink/pull/16056#discussion_r644469383



##########
File path: 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
##########
@@ -316,7 +316,8 @@ private void testParseErrors(TestSpec spec) {
                             .expect(Row.of(true)),
                     TestSpec.json("{\"id\":\"abc\"}")
                             .typeInfo(Types.ROW_NAMED(new String[] {"id"}, 
Types.INT))
-                            .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"abc\"}'"),
+                            .expectErrorMessage(
+                                    "Failed to deserialize JSON 
'{\"id\":\"abc\"}'. Failed to deserialize at field: id."),

Review comment:
       What about `Failed to deserialize JSON '{"id":"long"}' at field: id.`?

##########
File path: 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
##########
@@ -179,6 +181,22 @@ public void testSerializeRowWithInvalidNumberOfFields() {
                         
.failsWithException(instanceOf(RuntimeException.class)));
     }
 
+    @Test
+    public void testSerializeRowWithTypesMismatch() {
+        final TypeInformation<Row> rowSchema =
+                Types.ROW_NAMED(new String[] {"f1", "f2"}, Types.INT, 
Types.STRING);
+        final Row row = new Row(2);
+        row.setField(0, 1);
+        row.setField(1, 1);
+        final JsonRowSerializationSchema serializationSchema =
+                new JsonRowSerializationSchema.Builder(rowSchema).build();
+        String expectErrMessage = "Failed to serialize at field: f2";
+        assertThat(

Review comment:
       nit: It seems the `Assert.assertThat` has been deprecated. Please use 
`MatcherAssert.assertThat` instead.

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
##########
@@ -389,13 +390,17 @@ private SerializationRuntimeConverter 
assembleRowConverter(
 
             for (int i = 0; i < fieldNames.length; i++) {
                 String fieldName = fieldNames[i];
-                node.set(
-                        fieldName,
-                        fieldConverters
-                                .get(i)
-                                .convert(mapper, node.get(fieldNames[i]), 
row.getField(i)));
+                try {
+                    node.set(
+                            fieldName,
+                            fieldConverters
+                                    .get(i)
+                                    .convert(mapper, node.get(fieldNames[i]), 
row.getField(i)));
+                } catch (Throwable t) {
+                    throw new IllegalStateException(

Review comment:
       What about `JsonParseException`?

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
##########
@@ -259,9 +264,19 @@ private static RuntimeConverter 
assembleRowRuntimeConverter(
             for (int i = 0; i < Math.min(rowArity, nodeSize); i++) {
                 // Jackson only supports mapping by name in the first level
                 if (isTopLevel) {
-                    row.setField(i, 
fieldConverters[i].convert(node.get(fieldNames[i])));
+                    try {
+                        row.setField(i, 
fieldConverters[i].convert(node.get(fieldNames[i])));
+                    } catch (Throwable t) {
+                        throw new IllegalStateException(
+                                String.format(errMessage, fieldNames[i]), t);
+                    }
                 } else {
-                    row.setField(i, fieldConverters[i].convert(node.get(i)));
+                    try {
+                        row.setField(i, 
fieldConverters[i].convert(node.get(i)));
+                    } catch (Throwable t) {
+                        throw new IllegalStateException(
+                                String.format(errMessage, fieldNames[i]), t);
+                    }

Review comment:
       It seems it's a little duplicate here. Could you add a function to reuse 
the logic?
   
   Would be better to use `RuntimeException` to align with others.

##########
File path: 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
##########
@@ -290,6 +291,38 @@ public void testSerializeDeserializeNestedTypes() throws 
Exception {
         testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
     }
 
+    @Test
+    public void testSerializationWithTypesMismatch() {
+        final TypeInformation<Row> rowInfo =
+                Types.ROW_NAMED(new String[] {"f1", "f2"}, Types.INT, 
Types.STRING);
+        final CsvRowSerializationSchema.Builder serSchemaBuilder =
+                new 
CsvRowSerializationSchema.Builder(rowInfo).setLineDelimiter("");
+        try {
+            serialize(serSchemaBuilder, Row.of("test", "test"));
+        } catch (Throwable t) {
+            String expectedMessage = "Failed to serialize at field: f1";
+            assertTrue(t.getMessage().contains(expectedMessage));
+            return;
+        }
+        fail();

Review comment:
       Should be like
   
   ```
          try {
               serialize(serSchemaBuilder, Row.of("test", "test"));
               fail("Should fail.");
           } catch (Throwable t) {
               String expectedMessage = "Failed to serialize at field: f1";
               assertTrue(t.getMessage().contains(expectedMessage));
           }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to