kalencaya commented on issue #1324:
URL: 
https://github.com/apache/incubator-seatunnel/issues/1324#issuecomment-1050611529


   I had found out why FileSink can't output some date columns produced by 
JdbcSource.
   
   JdbcSource maps date jdbc type to java.sql.Date and so on automatically, 
then FileSource uses JsonRowOutputFormat output data to file.
   Unfortunately, JsonRowOutputFormat ignores java.sql.Date、java.sql.Time or 
java.sql.Timestamp for SqlTimeTypeInfo not behaves as a basic type but a atomic 
type.
   I think changing if condition from `type.isBasicType()` to `type instanceof 
AtomicType` is a good idea.
   
   ```
   public class JsonRowOutputFormat extends FileOutputFormat<Row> {
   
       @Override
       public void writeRecord(Row record) throws IOException {
           final JSONObject json = getJson(record, rowTypeInfo);
           byte[] bytes = json.toString().getBytes(charset);
           this.stream.write(bytes);
           this.stream.write(NEWLINE);
       }
   
       private JSONObject getJson(Row record, RowTypeInfo rowTypeInfo) {
           String[] fieldNames = rowTypeInfo.getFieldNames();
           int i = 0;
           JSONObject json = new JSONObject();
           for (String name : fieldNames) {
               Object field = record.getField(i);
               final TypeInformation type = rowTypeInfo.getTypeAt(i);
               // SqlTimeTypeInfo is not a basic type
               if (type.isBasicType())) {
                   json.put(name, field);
               } else if (type instanceof ObjectArrayTypeInfo) {
                   ObjectArrayTypeInfo arrayTypeInfo = (ObjectArrayTypeInfo) 
type;
                   TypeInformation componentInfo = 
arrayTypeInfo.getComponentInfo();
                   JSONArray jsonArray = new JSONArray();
                   if (componentInfo instanceof RowTypeInfo) {
                       final Row[] rows = (Row[]) field;
                       for (Row r : rows) {
                           jsonArray.add(getJson(r, (RowTypeInfo) 
componentInfo));
                       }
                   } else {
                       jsonArray.addAll(Arrays.asList((Object[]) field));
                   }
                   json.put(name, jsonArray);
               } else if (type instanceof RowTypeInfo) {
                   RowTypeInfo typeInfo = (RowTypeInfo) type;
                   json.put(name, getJson((Row) field, typeInfo));
               }
               i++;
           }
           return json;
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to