buptljy closed pull request #6568: [FLINK-10119][Kafka Connector] -
JsonRowDeserializeSchema deserialize message with error
URL: https://github.com/apache/flink/pull/6568
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index dc8a116ac62..e4e9f48ef25 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -62,6 +62,12 @@
/** Flag indicating whether to fail on a missing field. */
private boolean failOnMissingField;
+ /** Flag indicating whether to ignore the line when exeption is thrown.
*/
+ private boolean nullErrorLine;
+
+ /** Flag indicating whether to add an additional field when exception
is thrown. */
+ private boolean additionalErrorField;
+
/**
* Creates a JSON deserialization schema for the given type information.
*
@@ -94,7 +100,19 @@ public Row deserialize(byte[] message) throws IOException {
final JsonNode root = objectMapper.readTree(message);
return convertRow(root, (RowTypeInfo) typeInfo);
} catch (Throwable t) {
- throw new IOException("Failed to deserialize JSON
object.", t);
+ if (nullErrorLine || additionalErrorField) {
+ final int arity = typeInfo.getArity();
+ final Object[] nullsArray = new Object[arity];
+ if (additionalErrorField) {
+ final Object[] addionalNullsArray = new
Object[arity + 1];
+ System.arraycopy(nullsArray, 0,
addionalNullsArray, 0, arity);
+ addionalNullsArray[arity] =
t.getMessage();
+ return Row.of(addionalNullsArray);
+ }
+ return Row.of(nullsArray);
+ } else {
+ throw new IOException("Failed to deserialize
JSON object.", t);
+ }
}
}
@@ -119,6 +137,20 @@ public void setFailOnMissingField(boolean
failOnMissingField) {
this.failOnMissingField = failOnMissingField;
}
+ /**
+ * Configures whether to ignore the line when exeption is thrown. false
by default.
+ */
+ public void setNullErrorLine(boolean nullErrorLine) {
+ this.nullErrorLine = nullErrorLine;
+ }
+
+ /**
+ * Configures whether to add an additional field when exeption is
thrown. false by default.
+ */
+ public void setAdditionalErrorField(boolean additionalErrorField) {
+ this.additionalErrorField = additionalErrorField;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
index 458b94a5df7..a65234e4578 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
@@ -60,6 +60,8 @@ public boolean supportsSchemaDerivation() {
properties.add(JsonValidator.FORMAT_JSON_SCHEMA);
properties.add(JsonValidator.FORMAT_SCHEMA);
properties.add(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD);
+ properties.add(JsonValidator.FORMAT_NULL_ERROR_LINE);
+ properties.add(JsonValidator.FORMAT_ADDITIONAL_ERROR_FIELD);
properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
properties.addAll(SchemaValidator.getSchemaDerivationKeys());
return properties;
@@ -74,6 +76,10 @@ public boolean supportsSchemaDerivation() {
descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
.ifPresent(schema::setFailOnMissingField);
+
descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_NULL_ERROR_LINE)
+ .ifPresent(schema::setNullErrorLine);
+
descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_ADDITIONAL_ERROR_FIELD)
+ .ifPresent(schema::setAdditionalErrorField);
return schema;
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
index 026f49e611c..75a1ef0b0e5 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
@@ -24,8 +24,10 @@
import org.apache.flink.util.Preconditions;
import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
+import static
org.apache.flink.table.descriptors.JsonValidator.FORMAT_ADDITIONAL_ERROR_FIELD;
import static
org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
import static
org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA;
+import static
org.apache.flink.table.descriptors.JsonValidator.FORMAT_NULL_ERROR_LINE;
import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA;
import static
org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
@@ -35,6 +37,8 @@
public class Json extends FormatDescriptor {
private Boolean failOnMissingField;
+ private Boolean nullErrorLine;
+ private Boolean additionalErrorField;
private Boolean deriveSchema;
private String jsonSchema;
private String schema;
@@ -57,6 +61,27 @@ public Json failOnMissingField(boolean failOnMissingField) {
return this;
}
+ /**
+ * Sets flag whether to ignore the line if exception is thrown.
+ *
+ * @param nullErrorLine If set to true, the line will be ignored if
exception is thrown.
+ */
+ public Json nullErrorLine(boolean nullErrorLine) {
+ this.nullErrorLine = nullErrorLine;
+ return this;
+ }
+
+ /**
+ * Sets flag whether to add an additional field to store error messages
if exception is thrown.
+ *
+ * @param additionalErrorField If set to true, there will be an
additional field to store
+ * if exception is thrown.
+ */
+ public Json additionalErrorField(boolean additionalErrorField) {
+ this.additionalErrorField = additionalErrorField;
+ return this;
+ }
+
/**
* Sets the JSON schema string with field names and the types according
to the JSON schema
* specification [[http://json-schema.org/specification.html]].
@@ -126,5 +151,13 @@ public void addFormatProperties(DescriptorProperties
properties) {
if (failOnMissingField != null) {
properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD,
failOnMissingField);
}
+
+ if (nullErrorLine != null) {
+ properties.putBoolean(FORMAT_NULL_ERROR_LINE,
nullErrorLine);
+ }
+
+ if (additionalErrorField != null) {
+ properties.putBoolean(FORMAT_ADDITIONAL_ERROR_FIELD,
additionalErrorField);
+ }
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
index 49e1abc8a19..1b8bd0328f2 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
@@ -29,6 +29,8 @@
public static final String FORMAT_SCHEMA = "format.schema";
public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
public static final String FORMAT_FAIL_ON_MISSING_FIELD =
"format.fail-on-missing-field";
+ public static final String FORMAT_NULL_ERROR_LINE =
"format.null-error-line";
+ public static final String FORMAT_ADDITIONAL_ERROR_FIELD =
"format.additional-error-field";
@Override
public void validate(DescriptorProperties properties) {
@@ -51,5 +53,7 @@ public void validate(DescriptorProperties properties) {
}
properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true);
+ properties.validateBoolean(FORMAT_NULL_ERROR_LINE, true);
+ properties.validateBoolean(FORMAT_ADDITIONAL_ERROR_FIELD, true);
}
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index 5e77b801a44..437efe0f559 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -36,6 +36,8 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
/**
* Tests for the {@link JsonRowDeserializationSchema}.
@@ -179,6 +181,41 @@ public void testMissingNode() throws Exception {
}
}
+ /**
+ * Test deserialization with setting nullErrorLine true.
+ */
+ @Test
+ public void testNullErrorLine() throws IOException {
+ final JsonRowDeserializationSchema deserializationSchema = new
JsonRowDeserializationSchema(
+ Types.ROW_NAMED(
+ new String[] { "name" },
+ Types.STRING)
+ );
+ deserializationSchema.setNullErrorLine(true);
+
+ final Row row =
deserializationSchema.deserialize("zxvdd".getBytes());
+ assertNull(row.getField(0));
+ }
+
+ /**
+ * Test deserialization with setting additionalErrorField true.
+ */
+ @Test
+ public void testAdditionalErrorField() throws IOException {
+ final JsonRowDeserializationSchema deserializationSchema = new
JsonRowDeserializationSchema(
+ Types.ROW_NAMED(
+ new String[] { "name", "age" },
+ Types.STRING, Types.INT)
+ );
+ deserializationSchema.setAdditionalErrorField(true);
+
+ final Row row =
deserializationSchema.deserialize("zxvdd".getBytes());
+ assertEquals(3, row.getArity());
+ assertNull(row.getField(0));
+ assertNull(row.getField(1));
+ assertNotNull(row.getField(2));
+ }
+
/**
* Tests that number of field names and types has to match.
*/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services