JingsongLi commented on a change in pull request #11962:
URL: https://github.com/apache/flink/pull/11962#discussion_r417999723



##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Deserialization schema from CSV to Flink Table & SQL internal data 
structures.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and
+ * converts it to {@link RowData}.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped {@link 
IOException}s.
+ */
+@Internal
+public final class CsvRowDataDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Type information describing the result type. */
+       private final TypeInformation<RowData> resultTypeInfo;
+
+       /** Runtime instance that performs the actual work. */
+       private final DeserializationRuntimeConverter runtimeConverter;
+
+       /** Schema describing the input CSV data. */
+       private final CsvSchema csvSchema;
+
+       /** Object reader used to read rows. It is configured by {@link 
CsvSchema}. */
+       private final ObjectReader objectReader;
+
+       /** Flag indicating whether to ignore invalid fields/rows (default: 
throw an exception). */
+       private final boolean ignoreParseErrors;
+
+       private CsvRowDataDeserializationSchema(
+                       RowType rowType,
+                       TypeInformation<RowData> resultTypeInfo,
+                       CsvSchema csvSchema,
+                       boolean ignoreParseErrors) {
+               this.resultTypeInfo = resultTypeInfo;
+               this.runtimeConverter = createRowConverter(rowType, true);
+               this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+               this.objectReader = new 
CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+               this.ignoreParseErrors = ignoreParseErrors;
+       }
+
+       /**
+        * A builder for creating a {@link CsvRowDataDeserializationSchema}.
+        */
+       @Internal
+       public static class Builder {
+
+               private final RowType rowType;
+               private final TypeInformation<RowData> resultTypeInfo;
+               private CsvSchema csvSchema;
+               private boolean ignoreParseErrors;
+
+               /**
+                * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with
+                * optional parameters.
+                */
+               public Builder(RowType rowType, TypeInformation<RowData> 
resultTypeInfo) {
+                       Preconditions.checkNotNull(rowType, "RowType must not 
be null.");
+                       Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
+                       this.rowType = rowType;
+                       this.resultTypeInfo = resultTypeInfo;
+                       this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+               }
+
+               public Builder setFieldDelimiter(char delimiter) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setColumnSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder setAllowComments(boolean allowComments) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setAllowComments(allowComments).build();
+                       return this;
+               }
+
+               public Builder setArrayElementDelimiter(String delimiter) {
+                       Preconditions.checkNotNull(delimiter, "Array element 
delimiter must not be null.");
+                       this.csvSchema = 
this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder setQuoteCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setQuoteChar(c).build();
+                       return this;
+               }
+
+               public Builder setEscapeCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setEscapeChar(c).build();
+                       return this;
+               }
+
+               public Builder setNullLiteral(String nullLiteral) {
+                       Preconditions.checkNotNull(nullLiteral, "Null literal 
must not be null.");
+                       this.csvSchema = 
this.csvSchema.rebuild().setNullValue(nullLiteral).build();
+                       return this;
+               }
+
+               public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+                       this.ignoreParseErrors = ignoreParseErrors;
+                       return this;
+               }
+
+               public CsvRowDataDeserializationSchema build() {
+                       return new CsvRowDataDeserializationSchema(
+                               rowType,
+                               resultTypeInfo,
+                               csvSchema,
+                               ignoreParseErrors);
+               }
+       }
+
+       @Override
+       public RowData deserialize(byte[] message) throws IOException {
+               try {
+                       final JsonNode root = objectReader.readValue(message);
+                       return (RowData) runtimeConverter.convert(root);
+               } catch (Throwable t) {
+                       if (ignoreParseErrors) {
+                               return null;
+                       }
+                       throw new IOException("Failed to deserialize CSV row '" 
+ new String(message) + "'.", t);
+               }
+       }
+
+       @Override
+       public boolean isEndOfStream(RowData nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<RowData> getProducedType() {
+               return resultTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || o.getClass() != this.getClass()) {
+                       return false;
+               }
+               final CsvRowDataDeserializationSchema that = 
(CsvRowDataDeserializationSchema) o;
+               final CsvSchema otherSchema = that.csvSchema;
+
+               return resultTypeInfo.equals(that.resultTypeInfo) &&
+                       ignoreParseErrors == that.ignoreParseErrors &&
+                       csvSchema.getColumnSeparator() == 
otherSchema.getColumnSeparator() &&
+                       csvSchema.allowsComments() == 
otherSchema.allowsComments() &&
+                       
csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator())
 &&
+                       csvSchema.getQuoteChar() == otherSchema.getQuoteChar() 
&&
+                       csvSchema.getEscapeChar() == 
otherSchema.getEscapeChar() &&
+                       Arrays.equals(csvSchema.getNullValue(), 
otherSchema.getNullValue());
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       resultTypeInfo,
+                       ignoreParseErrors,
+                       csvSchema.getColumnSeparator(),
+                       csvSchema.allowsComments(),
+                       csvSchema.getArrayElementSeparator(),
+                       csvSchema.getQuoteChar(),
+                       csvSchema.getEscapeChar(),
+                       csvSchema.getNullValue());
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
-------------------------------------------------------------------------------------
+
+       // 
-------------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that converts {@link JsonNode}s into objects of 
Flink Table & SQL
+        * internal data structures.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+               Object convert(JsonNode jsonNode);
+       }
+
+       private DeserializationRuntimeConverter createRowConverter(RowType 
rowType, boolean isTopLevel) {
+               final DeserializationRuntimeConverter[] fieldConverters = 
rowType.getFields().stream()
+                       .map(RowType.RowField::getType)
+                       .map(this::createNullableConverter)
+                       .toArray(DeserializationRuntimeConverter[]::new);
+               final String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+               final int arity = fieldNames.length;
+
+               return jsonNode -> {
+                       int nodeSize = jsonNode.size();
+
+                       validateArity(arity, nodeSize, ignoreParseErrors);
+
+                       GenericRowData row = new GenericRowData(arity);
+                       for (int i = 0; i < arity; i++) {
+                               JsonNode field;
+                               // Jackson only supports mapping by name in the 
first level
+                               if (isTopLevel) {
+                                       field = jsonNode.get(fieldNames[i]);
+                               } else {
+                                       field = jsonNode.get(i);
+                               }
+                               if (field == null) {
+                                       row.setField(i, null);
+                               } else {
+                                       row.setField(i, 
fieldConverters[i].convert(field));
+                               }
+                       }
+                       return row;
+               };
+       }
+
+       /**
+        * Creates a runtime converter which is null safe.
+        */
+       private DeserializationRuntimeConverter 
createNullableConverter(LogicalType type) {
+               final DeserializationRuntimeConverter converter = 
createConverter(type);
+               return jsonNode -> {
+                       if (jsonNode == null || jsonNode.isNull()) {
+                               return null;
+                       }
+                       try {
+                               return converter.convert(jsonNode);
+                       } catch (Throwable t) {
+                               if (!ignoreParseErrors) {
+                                       throw t;
+                               }
+                               return null;
+                       }
+               };
+       }
+
+       /**
+        * Creates a runtime converter which assuming input object is not null.
+        */
+       private DeserializationRuntimeConverter createConverter(LogicalType 
type) {
+               switch (type.getTypeRoot()) {
+                       case NULL:
+                               return jsonNode -> null;
+                       case BOOLEAN:
+                               return this::convertToBoolean;
+                       case TINYINT:
+                               return jsonNode -> 
Byte.parseByte(jsonNode.asText().trim());
+                       case SMALLINT:
+                               return jsonNode -> 
Short.parseShort(jsonNode.asText().trim());
+                       case INTEGER:
+                       case INTERVAL_YEAR_MONTH:
+                               return this::convertToInt;
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                               return this::convertToLong;
+                       case DATE:
+                               return this::convertToDate;
+                       case TIME_WITHOUT_TIME_ZONE:
+                               return this::convertToTime;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return this::convertToTimestamp;
+                       case FLOAT:
+                               return this::convertToFloat;
+                       case DOUBLE:
+                               return this::convertToDouble;
+                       case CHAR:
+                       case VARCHAR:
+                               return this::convertToString;
+                       case BINARY:
+                       case VARBINARY:
+                               return this::convertToBytes;
+                       case DECIMAL:
+                               return createDecimalConverter((DecimalType) 
type);
+                       case ARRAY:
+                               return createArrayConverter((ArrayType) type);
+                       case ROW:
+                               return createRowConverter((RowType) type, 
false);
+                       case MAP:
+                       case MULTISET:
+                       case RAW:
+                       default:
+                               throw new 
UnsupportedOperationException("Unsupported type: " + type);
+               }
+       }
+
+       private boolean convertToBoolean(JsonNode jsonNode) {
+               if (jsonNode.isBoolean()) {

Review comment:
       Just use `jsonNode.isBoolean() ? jsonNode.asBoolean() : 
Boolean.parseBoolean(jsonNode.asText().trim())`?
   Same below.

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Deserialization schema from CSV to Flink Table & SQL internal data 
structures.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and
+ * converts it to {@link RowData}.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped {@link 
IOException}s.
+ */
+@Internal
+public final class CsvRowDataDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Type information describing the result type. */
+       private final TypeInformation<RowData> resultTypeInfo;
+
+       /** Runtime instance that performs the actual work. */
+       private final DeserializationRuntimeConverter runtimeConverter;
+
+       /** Schema describing the input CSV data. */
+       private final CsvSchema csvSchema;
+
+       /** Object reader used to read rows. It is configured by {@link 
CsvSchema}. */
+       private final ObjectReader objectReader;
+
+       /** Flag indicating whether to ignore invalid fields/rows (default: 
throw an exception). */
+       private final boolean ignoreParseErrors;
+
+       private CsvRowDataDeserializationSchema(
+                       RowType rowType,
+                       TypeInformation<RowData> resultTypeInfo,
+                       CsvSchema csvSchema,
+                       boolean ignoreParseErrors) {
+               this.resultTypeInfo = resultTypeInfo;
+               this.runtimeConverter = createRowConverter(rowType, true);
+               this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+               this.objectReader = new 
CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+               this.ignoreParseErrors = ignoreParseErrors;
+       }
+
+       /**
+        * A builder for creating a {@link CsvRowDataDeserializationSchema}.
+        */
+       @Internal
+       public static class Builder {
+
+               private final RowType rowType;
+               private final TypeInformation<RowData> resultTypeInfo;
+               private CsvSchema csvSchema;
+               private boolean ignoreParseErrors;
+
+               /**
+                * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with
+                * optional parameters.
+                */
+               public Builder(RowType rowType, TypeInformation<RowData> 
resultTypeInfo) {
+                       Preconditions.checkNotNull(rowType, "RowType must not 
be null.");
+                       Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
+                       this.rowType = rowType;
+                       this.resultTypeInfo = resultTypeInfo;
+                       this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+               }
+
+               public Builder setFieldDelimiter(char delimiter) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setColumnSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder setAllowComments(boolean allowComments) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setAllowComments(allowComments).build();
+                       return this;
+               }
+
+               public Builder setArrayElementDelimiter(String delimiter) {
+                       Preconditions.checkNotNull(delimiter, "Array element 
delimiter must not be null.");
+                       this.csvSchema = 
this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder setQuoteCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setQuoteChar(c).build();
+                       return this;
+               }
+
+               public Builder setEscapeCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setEscapeChar(c).build();
+                       return this;
+               }
+
+               public Builder setNullLiteral(String nullLiteral) {
+                       Preconditions.checkNotNull(nullLiteral, "Null literal 
must not be null.");
+                       this.csvSchema = 
this.csvSchema.rebuild().setNullValue(nullLiteral).build();
+                       return this;
+               }
+
+               public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+                       this.ignoreParseErrors = ignoreParseErrors;
+                       return this;
+               }
+
+               public CsvRowDataDeserializationSchema build() {
+                       return new CsvRowDataDeserializationSchema(
+                               rowType,
+                               resultTypeInfo,
+                               csvSchema,
+                               ignoreParseErrors);
+               }
+       }
+
+       @Override
+       public RowData deserialize(byte[] message) throws IOException {
+               try {
+                       final JsonNode root = objectReader.readValue(message);
+                       return (RowData) runtimeConverter.convert(root);
+               } catch (Throwable t) {
+                       if (ignoreParseErrors) {
+                               return null;
+                       }
+                       throw new IOException("Failed to deserialize CSV row '" 
+ new String(message) + "'.", t);
+               }
+       }
+
+       @Override
+       public boolean isEndOfStream(RowData nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<RowData> getProducedType() {
+               return resultTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || o.getClass() != this.getClass()) {
+                       return false;
+               }
+               final CsvRowDataDeserializationSchema that = 
(CsvRowDataDeserializationSchema) o;
+               final CsvSchema otherSchema = that.csvSchema;
+
+               return resultTypeInfo.equals(that.resultTypeInfo) &&
+                       ignoreParseErrors == that.ignoreParseErrors &&
+                       csvSchema.getColumnSeparator() == 
otherSchema.getColumnSeparator() &&
+                       csvSchema.allowsComments() == 
otherSchema.allowsComments() &&
+                       
csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator())
 &&
+                       csvSchema.getQuoteChar() == otherSchema.getQuoteChar() 
&&
+                       csvSchema.getEscapeChar() == 
otherSchema.getEscapeChar() &&
+                       Arrays.equals(csvSchema.getNullValue(), 
otherSchema.getNullValue());
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       resultTypeInfo,
+                       ignoreParseErrors,
+                       csvSchema.getColumnSeparator(),
+                       csvSchema.allowsComments(),
+                       csvSchema.getArrayElementSeparator(),
+                       csvSchema.getQuoteChar(),
+                       csvSchema.getEscapeChar(),
+                       csvSchema.getNullValue());
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
-------------------------------------------------------------------------------------
+
+       // 
-------------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that converts {@link JsonNode}s into objects of 
Flink Table & SQL
+        * internal data structures.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+               Object convert(JsonNode jsonNode);
+       }
+
+       private DeserializationRuntimeConverter createRowConverter(RowType 
rowType, boolean isTopLevel) {
+               final DeserializationRuntimeConverter[] fieldConverters = 
rowType.getFields().stream()
+                       .map(RowType.RowField::getType)
+                       .map(this::createNullableConverter)
+                       .toArray(DeserializationRuntimeConverter[]::new);
+               final String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+               final int arity = fieldNames.length;
+
+               return jsonNode -> {
+                       int nodeSize = jsonNode.size();
+
+                       validateArity(arity, nodeSize, ignoreParseErrors);
+
+                       GenericRowData row = new GenericRowData(arity);
+                       for (int i = 0; i < arity; i++) {
+                               JsonNode field;
+                               // Jackson only supports mapping by name in the 
first level
+                               if (isTopLevel) {
+                                       field = jsonNode.get(fieldNames[i]);
+                               } else {
+                                       field = jsonNode.get(i);
+                               }
+                               if (field == null) {
+                                       row.setField(i, null);
+                               } else {
+                                       row.setField(i, 
fieldConverters[i].convert(field));
+                               }
+                       }
+                       return row;
+               };
+       }
+
+       /**
+        * Creates a runtime converter which is null safe.
+        */
+       private DeserializationRuntimeConverter 
createNullableConverter(LogicalType type) {
+               final DeserializationRuntimeConverter converter = 
createConverter(type);
+               return jsonNode -> {
+                       if (jsonNode == null || jsonNode.isNull()) {
+                               return null;
+                       }
+                       try {
+                               return converter.convert(jsonNode);
+                       } catch (Throwable t) {
+                               if (!ignoreParseErrors) {
+                                       throw t;
+                               }
+                               return null;
+                       }
+               };
+       }
+
+       /**
+        * Creates a runtime converter which assuming input object is not null.
+        */
+       private DeserializationRuntimeConverter createConverter(LogicalType 
type) {
+               switch (type.getTypeRoot()) {
+                       case NULL:
+                               return jsonNode -> null;
+                       case BOOLEAN:
+                               return this::convertToBoolean;
+                       case TINYINT:
+                               return jsonNode -> 
Byte.parseByte(jsonNode.asText().trim());
+                       case SMALLINT:
+                               return jsonNode -> 
Short.parseShort(jsonNode.asText().trim());
+                       case INTEGER:
+                       case INTERVAL_YEAR_MONTH:
+                               return this::convertToInt;
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                               return this::convertToLong;
+                       case DATE:
+                               return this::convertToDate;
+                       case TIME_WITHOUT_TIME_ZONE:
+                               return this::convertToTime;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return this::convertToTimestamp;
+                       case FLOAT:
+                               return this::convertToFloat;
+                       case DOUBLE:
+                               return this::convertToDouble;
+                       case CHAR:
+                       case VARCHAR:
+                               return this::convertToString;
+                       case BINARY:
+                       case VARBINARY:
+                               return this::convertToBytes;
+                       case DECIMAL:
+                               return createDecimalConverter((DecimalType) 
type);
+                       case ARRAY:
+                               return createArrayConverter((ArrayType) type);
+                       case ROW:
+                               return createRowConverter((RowType) type, 
false);
+                       case MAP:
+                       case MULTISET:
+                       case RAW:
+                       default:
+                               throw new 
UnsupportedOperationException("Unsupported type: " + type);
+               }
+       }
+
+       private boolean convertToBoolean(JsonNode jsonNode) {
+               if (jsonNode.isBoolean()) {
+                       // avoid redundant toString and parseBoolean, for 
better performance

Review comment:
       Why we can't use `node.asBoolean()` directly?

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
+
+/**
+ * Serialization schema that serializes an object of Flink Table & SQL 
internal data structure
+ * into a CSV bytes.
+ *
+ * <p>Serializes the input row into a {@link JsonNode} and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link 
CsvRowDataDeserializationSchema}.
+ */
+@PublicEvolving
+public final class CsvRowDataSerializationSchema implements 
SerializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Logical row type describing the input CSV data. */
+       private final RowType rowType;
+
+       /** Runtime instance that performs the actual work. */
+       private final SerializationRuntimeConverter runtimeConverter;
+
+       /** CsvMapper used to write {@link JsonNode} into bytes. */
+       private final CsvMapper csvMapper;
+
+       /** Schema describing the input CSV data. */
+       private final CsvSchema csvSchema;
+
+       /** Object writer used to write rows. It is configured by {@link 
CsvSchema}. */
+       private final ObjectWriter objectWriter;
+
+       /** Reusable object node. */
+       private transient ObjectNode root;
+
+       private CsvRowDataSerializationSchema(
+                       RowType rowType,
+                       CsvSchema csvSchema) {
+               this.rowType = rowType;
+               this.runtimeConverter = createRowConverter(rowType, true);
+               this.csvMapper = new CsvMapper();
+               this.csvSchema = csvSchema;
+               this.objectWriter = csvMapper.writer(csvSchema);
+       }
+
+       /**
+        * A builder for creating a {@link CsvRowDataSerializationSchema}.
+        */
+       @PublicEvolving
+       public static class Builder {
+
+               private final RowType rowType;
+               private CsvSchema csvSchema;
+
+               /**
+                * Creates a {@link CsvRowDataSerializationSchema} expecting 
the given {@link RowType}.
+                *
+                * @param rowType logical row type used to create schema.
+                */
+               public Builder(RowType rowType) {
+                       Preconditions.checkNotNull(rowType, "Row type must not 
be null.");
+
+                       this.rowType = rowType;
+                       this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+               }
+
+               public Builder setFieldDelimiter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setColumnSeparator(c).build();
+                       return this;
+               }
+
+               public Builder setLineDelimiter(String delimiter) {
+                       Preconditions.checkNotNull(delimiter, "Delimiter must 
not be null.");
+                       if (!delimiter.equals("\n") && !delimiter.equals("\r") 
&& !delimiter.equals("\r\n") && !delimiter.equals("")) {
+                               throw new IllegalArgumentException(
+                                       "Unsupported new line delimiter. Only 
\\n, \\r, \\r\\n, or empty string are supported.");
+                       }
+                       this.csvSchema = 
this.csvSchema.rebuild().setLineSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder setArrayElementDelimiter(String delimiter) {
+                       Preconditions.checkNotNull(delimiter, "Delimiter must 
not be null.");
+                       this.csvSchema = 
this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder disableQuoteCharacter() {
+                       this.csvSchema = 
this.csvSchema.rebuild().disableQuoteChar().build();
+                       return this;
+               }
+
+               public Builder setQuoteCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setQuoteChar(c).build();
+                       return this;
+               }
+
+               public Builder setEscapeCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setEscapeChar(c).build();
+                       return this;
+               }
+
+               public Builder setNullLiteral(String s) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setNullValue(s).build();
+                       return this;
+               }
+
+               public CsvRowDataSerializationSchema build() {
+                       return new CsvRowDataSerializationSchema(
+                               rowType,
+                               csvSchema);
+               }
+       }
+
+       @Override
+       public byte[] serialize(RowData row) {
+               if (root == null) {
+                       root = csvMapper.createObjectNode();
+               }
+               try {
+                       runtimeConverter.convert(csvMapper, root, row);
+                       return objectWriter.writeValueAsBytes(root);
+               } catch (Throwable t) {
+                       throw new RuntimeException("Could not serialize row '" 
+ row + "'.", t);
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (o == null || o.getClass() != this.getClass()) {
+                       return false;
+               }
+               if (this == o) {
+                       return true;
+               }
+               final CsvRowDataSerializationSchema that = 
(CsvRowDataSerializationSchema) o;
+               final CsvSchema otherSchema = that.csvSchema;
+
+               return rowType.equals(that.rowType) &&
+                       csvSchema.getColumnSeparator() == 
otherSchema.getColumnSeparator() &&
+                       Arrays.equals(csvSchema.getLineSeparator(), 
otherSchema.getLineSeparator()) &&
+                       
csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator())
 &&
+                       csvSchema.getQuoteChar() == otherSchema.getQuoteChar() 
&&
+                       csvSchema.getEscapeChar() == 
otherSchema.getEscapeChar() &&
+                       Arrays.equals(csvSchema.getNullValue(), 
otherSchema.getNullValue());
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       rowType,
+                       csvSchema.getColumnSeparator(),
+                       csvSchema.getLineSeparator(),
+                       csvSchema.getArrayElementSeparator(),
+                       csvSchema.getQuoteChar(),
+                       csvSchema.getEscapeChar(),
+                       csvSchema.getNullValue());
+       }
+
+       // 
--------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
--------------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that converts objects of Flink Table & SQL 
internal data structures
+        * to corresponding {@link JsonNode}s.
+        */
+       private interface SerializationRuntimeConverter extends Serializable {

Review comment:
       I think you should use something like `ParquetRowDataWriter.FieldWriter`.

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Deserialization schema from CSV to Flink Table & SQL internal data 
structures.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a {@link JsonNode} and
+ * converts it to {@link RowData}.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped {@link 
IOException}s.
+ */
+@Internal
+public final class CsvRowDataDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Type information describing the result type. */
+       private final TypeInformation<RowData> resultTypeInfo;
+
+       /** Runtime instance that performs the actual work. */
+       private final DeserializationRuntimeConverter runtimeConverter;
+
+       /** Schema describing the input CSV data. */
+       private final CsvSchema csvSchema;
+
+       /** Object reader used to read rows. It is configured by {@link 
CsvSchema}. */
+       private final ObjectReader objectReader;
+
+       /** Flag indicating whether to ignore invalid fields/rows (default: 
throw an exception). */
+       private final boolean ignoreParseErrors;
+
+       private CsvRowDataDeserializationSchema(
+                       RowType rowType,
+                       TypeInformation<RowData> resultTypeInfo,
+                       CsvSchema csvSchema,
+                       boolean ignoreParseErrors) {
+               this.resultTypeInfo = resultTypeInfo;
+               this.runtimeConverter = createRowConverter(rowType, true);
+               this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+               this.objectReader = new 
CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+               this.ignoreParseErrors = ignoreParseErrors;
+       }
+
+       /**
+        * A builder for creating a {@link CsvRowDataDeserializationSchema}.
+        */
+       @Internal
+       public static class Builder {
+
+               private final RowType rowType;
+               private final TypeInformation<RowData> resultTypeInfo;
+               private CsvSchema csvSchema;
+               private boolean ignoreParseErrors;
+
+               /**
+                * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with
+                * optional parameters.
+                */
+               public Builder(RowType rowType, TypeInformation<RowData> 
resultTypeInfo) {
+                       Preconditions.checkNotNull(rowType, "RowType must not 
be null.");
+                       Preconditions.checkNotNull(resultTypeInfo, "Result type 
information must not be null.");
+                       this.rowType = rowType;
+                       this.resultTypeInfo = resultTypeInfo;
+                       this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+               }
+
+               public Builder setFieldDelimiter(char delimiter) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setColumnSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder setAllowComments(boolean allowComments) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setAllowComments(allowComments).build();
+                       return this;
+               }
+
+               public Builder setArrayElementDelimiter(String delimiter) {
+                       Preconditions.checkNotNull(delimiter, "Array element 
delimiter must not be null.");
+                       this.csvSchema = 
this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build();
+                       return this;
+               }
+
+               public Builder setQuoteCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setQuoteChar(c).build();
+                       return this;
+               }
+
+               public Builder setEscapeCharacter(char c) {
+                       this.csvSchema = 
this.csvSchema.rebuild().setEscapeChar(c).build();
+                       return this;
+               }
+
+               public Builder setNullLiteral(String nullLiteral) {
+                       Preconditions.checkNotNull(nullLiteral, "Null literal 
must not be null.");
+                       this.csvSchema = 
this.csvSchema.rebuild().setNullValue(nullLiteral).build();
+                       return this;
+               }
+
+               public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+                       this.ignoreParseErrors = ignoreParseErrors;
+                       return this;
+               }
+
+               public CsvRowDataDeserializationSchema build() {
+                       return new CsvRowDataDeserializationSchema(
+                               rowType,
+                               resultTypeInfo,
+                               csvSchema,
+                               ignoreParseErrors);
+               }
+       }
+
+       @Override
+       public RowData deserialize(byte[] message) throws IOException {
+               try {
+                       final JsonNode root = objectReader.readValue(message);
+                       return (RowData) runtimeConverter.convert(root);
+               } catch (Throwable t) {
+                       if (ignoreParseErrors) {
+                               return null;
+                       }
+                       throw new IOException("Failed to deserialize CSV row '" 
+ new String(message) + "'.", t);
+               }
+       }
+
+       @Override
+       public boolean isEndOfStream(RowData nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<RowData> getProducedType() {
+               return resultTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || o.getClass() != this.getClass()) {
+                       return false;
+               }
+               final CsvRowDataDeserializationSchema that = 
(CsvRowDataDeserializationSchema) o;
+               final CsvSchema otherSchema = that.csvSchema;
+
+               return resultTypeInfo.equals(that.resultTypeInfo) &&
+                       ignoreParseErrors == that.ignoreParseErrors &&
+                       csvSchema.getColumnSeparator() == 
otherSchema.getColumnSeparator() &&
+                       csvSchema.allowsComments() == 
otherSchema.allowsComments() &&
+                       
csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator())
 &&
+                       csvSchema.getQuoteChar() == otherSchema.getQuoteChar() 
&&
+                       csvSchema.getEscapeChar() == 
otherSchema.getEscapeChar() &&
+                       Arrays.equals(csvSchema.getNullValue(), 
otherSchema.getNullValue());
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       resultTypeInfo,
+                       ignoreParseErrors,
+                       csvSchema.getColumnSeparator(),
+                       csvSchema.allowsComments(),
+                       csvSchema.getArrayElementSeparator(),
+                       csvSchema.getQuoteChar(),
+                       csvSchema.getEscapeChar(),
+                       csvSchema.getNullValue());
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
-------------------------------------------------------------------------------------
+
+       // 
-------------------------------------------------------------------------------------
+       // Runtime Converters
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that converts {@link JsonNode}s into objects of 
Flink Table & SQL
+        * internal data structures.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+               Object convert(JsonNode jsonNode);
+       }
+
+       private DeserializationRuntimeConverter createRowConverter(RowType 
rowType, boolean isTopLevel) {
+               final DeserializationRuntimeConverter[] fieldConverters = 
rowType.getFields().stream()
+                       .map(RowType.RowField::getType)
+                       .map(this::createNullableConverter)
+                       .toArray(DeserializationRuntimeConverter[]::new);
+               final String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+               final int arity = fieldNames.length;
+
+               return jsonNode -> {
+                       int nodeSize = jsonNode.size();
+
+                       validateArity(arity, nodeSize, ignoreParseErrors);
+
+                       GenericRowData row = new GenericRowData(arity);
+                       for (int i = 0; i < arity; i++) {
+                               JsonNode field;
+                               // Jackson only supports mapping by name in the 
first level
+                               if (isTopLevel) {
+                                       field = jsonNode.get(fieldNames[i]);
+                               } else {
+                                       field = jsonNode.get(i);
+                               }
+                               if (field == null) {
+                                       row.setField(i, null);
+                               } else {
+                                       row.setField(i, 
fieldConverters[i].convert(field));
+                               }
+                       }
+                       return row;
+               };
+       }
+
+       /**
+        * Creates a runtime converter which is null safe.
+        */
+       private DeserializationRuntimeConverter 
createNullableConverter(LogicalType type) {
+               final DeserializationRuntimeConverter converter = 
createConverter(type);
+               return jsonNode -> {
+                       if (jsonNode == null || jsonNode.isNull()) {
+                               return null;
+                       }
+                       try {
+                               return converter.convert(jsonNode);
+                       } catch (Throwable t) {
+                               if (!ignoreParseErrors) {
+                                       throw t;
+                               }
+                               return null;
+                       }
+               };
+       }
+
+       /**
+        * Creates a runtime converter which assuming input object is not null.
+        */
+       private DeserializationRuntimeConverter createConverter(LogicalType 
type) {
+               switch (type.getTypeRoot()) {
+                       case NULL:
+                               return jsonNode -> null;
+                       case BOOLEAN:
+                               return this::convertToBoolean;
+                       case TINYINT:
+                               return jsonNode -> 
Byte.parseByte(jsonNode.asText().trim());
+                       case SMALLINT:
+                               return jsonNode -> 
Short.parseShort(jsonNode.asText().trim());
+                       case INTEGER:
+                       case INTERVAL_YEAR_MONTH:
+                               return this::convertToInt;
+                       case BIGINT:
+                       case INTERVAL_DAY_TIME:
+                               return this::convertToLong;
+                       case DATE:
+                               return this::convertToDate;
+                       case TIME_WITHOUT_TIME_ZONE:
+                               return this::convertToTime;
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return this::convertToTimestamp;
+                       case FLOAT:
+                               return this::convertToFloat;
+                       case DOUBLE:
+                               return this::convertToDouble;
+                       case CHAR:
+                       case VARCHAR:
+                               return this::convertToString;
+                       case BINARY:
+                       case VARBINARY:
+                               return this::convertToBytes;
+                       case DECIMAL:
+                               return createDecimalConverter((DecimalType) 
type);
+                       case ARRAY:
+                               return createArrayConverter((ArrayType) type);
+                       case ROW:
+                               return createRowConverter((RowType) type, 
false);
+                       case MAP:
+                       case MULTISET:
+                       case RAW:
+                       default:
+                               throw new 
UnsupportedOperationException("Unsupported type: " + type);
+               }
+       }
+
+       private boolean convertToBoolean(JsonNode jsonNode) {
+               if (jsonNode.isBoolean()) {
+                       // avoid redundant toString and parseBoolean, for 
better performance
+                       return jsonNode.asBoolean();
+               } else {
+                       return Boolean.parseBoolean(jsonNode.asText().trim());
+               }
+       }
+
+       private int convertToInt(JsonNode jsonNode) {
+               if (jsonNode.canConvertToInt()) {
+                       // avoid redundant toString and parseInt, for better 
performance
+                       return jsonNode.asInt();
+               } else {
+                       return Integer.parseInt(jsonNode.asText().trim());
+               }
+       }
+
+       private long convertToLong(JsonNode jsonNode) {
+               if (jsonNode.canConvertToLong()) {
+                       // avoid redundant toString and parseLong, for better 
performance
+                       return jsonNode.asLong();
+               } else {
+                       return Long.parseLong(jsonNode.asText().trim());
+               }
+       }
+
+       private double convertToDouble(JsonNode jsonNode) {
+               if (jsonNode.isDouble()) {
+                       // avoid redundant toString and parseDouble, for better 
performance
+                       return jsonNode.asDouble();
+               } else {
+                       return Double.parseDouble(jsonNode.asText().trim());
+               }
+       }
+
+       private float convertToFloat(JsonNode jsonNode) {
+               if (jsonNode.isDouble()) {
+                       // avoid redundant toString and parseDouble, for better 
performance
+                       return (float) jsonNode.asDouble();
+               } else {
+                       return Float.parseFloat(jsonNode.asText().trim());
+               }
+       }
+
+       private int convertToDate(JsonNode jsonNode) {
+               // csv currently is using Date.valueOf() to parse date string
+               return (int) 
Date.valueOf(jsonNode.asText()).toLocalDate().toEpochDay();
+       }
+
+       private int convertToTime(JsonNode jsonNode) {
+               // csv currently is using Time.valueOf() to parse time string
+               LocalTime localTime = 
Time.valueOf(jsonNode.asText()).toLocalTime();
+               // get number of milliseconds of the day
+               return localTime.toSecondOfDay() * 1000;

Review comment:
       Loose mills precision here, use `toNanoOfDay`?
   BTW, add tests for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to