gauravmiglanid11 edited a comment on pull request #108:
URL: https://github.com/apache/flink-shaded/pull/108#issuecomment-1011939361


   hi @slinkydeveloper, ya tried building jar from your branch, it is not 
working. The mapper object in JsonRowSerializationSchema is instance and 
private field,  I have tried registering `JavaTimeModule` using reflection as 
well, but getting famous java.time.format.DateTimeFormatter serialization 
issue. https://github.com/FasterXML/jackson-databind/issues/2452. So what I 
tried is to write a generic serialization class for flink row. The below code 
is working as of now
   
   ```
   import java.util.HashMap;
   import java.util.Map;
   import lombok.SneakyThrows;
   import org.apache.flink.api.common.serialization.SerializationSchema;
   import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
   import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
   import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
   import org.apache.flink.types.Row;
   
   public class GenericSerializationSchema implements SerializationSchema<Row> {
   
     private final String[] columnNames;
     private static final ObjectMapper objectMapper = new ObjectMapper();
   
     public GenericSerializationSchema(String[] columnNames) {
       this.columnNames = columnNames;
       objectMapper.registerModule(new JavaTimeModule());
       objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
     }
   
     @SneakyThrows
     @Override
     public byte[] serialize(Row row) {
       if (row == null) {
         return null;
       }
       Map<String, Object> map = new HashMap<>();
       for (int i = 0; i < columnNames.length; i++) {
         Object field = row.getField(i);
         if (field instanceof Row) {
           Row fieldRow = (Row) field;
           String[] rowColumnNames = 
fieldRow.getFieldNames(true).stream().toArray(String[]::new);
           map.put(
               columnNames[i],
               new String(new 
GenericSerializationSchema(rowColumnNames).serialize(fieldRow)));
         } else {
           map.put(columnNames[i], field);
         }
       }
       return objectMapper.writeValueAsBytes(map);
     }
   }
   ```
   
   If I make `objectMapper` instance field instead of static, I'm getting the 
same not serializable issue for java time formatter, which is the same error 
scenario as with class `JsonRowSerializationSchema`
   
   Jackson object mapper can be declared static as per this 
https://stackoverflow.com/questions/3907929/should-i-declare-jacksons-objectmapper-as-a-static-field,
 am I missing something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to