gauravmiglanid11 commented on pull request #108:
URL: https://github.com/apache/flink-shaded/pull/108#issuecomment-1011939361


   hi @slinkydeveloper, ya tried building jar from your branch, it is not 
working. The mapper object in JsonRowSerializationSchema is instance and 
private field,  I have tried registering `JavaTimeModule` using reflection as 
well, but getting famous java.time.format.DateTimeFormatter serialization 
issue. https://github.com/FasterXML/jackson-databind/issues/2452. So what I did 
is to write generic serilization class  for flink row. Below code is working as 
of now
   
   ```
   import java.util.HashMap;
   import java.util.Map;
   import lombok.SneakyThrows;
   import org.apache.flink.api.common.serialization.SerializationSchema;
   import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
   import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
   import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
   import org.apache.flink.types.Row;
   
   public class GenericSerializationSchema implements SerializationSchema<Row> {
   
     private final String[] columnNames;
     private static final ObjectMapper objectMapper = new ObjectMapper();
   
     public GenericSerializationSchema(String[] columnNames) {
       this.columnNames = columnNames;
       objectMapper.registerModule(new JavaTimeModule());
       objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
     }
   
     @SneakyThrows
     @Override
     public byte[] serialize(Row row) {
       if (row == null) {
         return null;
       }
       Map<String, Object> map = new HashMap<>();
       for (int i = 0; i < columnNames.length; i++) {
         Object field = row.getField(i);
         if (field instanceof Row) {
           Row fieldRow = (Row) field;
           String[] rowColumnNames = 
fieldRow.getFieldNames(true).stream().toArray(String[]::new);
           map.put(
               columnNames[i],
               new String(new 
GenericSerializationSchema(rowColumnNames).serialize(fieldRow)));
         } else {
           map.put(columnNames[i], field);
         }
       }
       return objectMapper.writeValueAsBytes(map);
     }
   }
   ```
   
   If I make `objectMapper` instance field instead of static, I'm getting the 
same not serializable issue for java time formatter, which is same error 
scenario as of with class `JsonRowSerializationSchema`
   
   jackson object mapper can be declare static as per this 
https://stackoverflow.com/questions/3907929/should-i-declare-jacksons-objectmapper-as-a-static-field,
 am I missing something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to