[ 
https://issues.apache.org/jira/browse/FLINK-18530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

humengyu closed FLINK-18530.
----------------------------
    Resolution: Fixed

> ParquetAvroWriters can not write data to hdfs
> ---------------------------------------------
>
>                 Key: FLINK-18530
>                 URL: https://issues.apache.org/jira/browse/FLINK-18530
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.0
>            Reporter: humengyu
>            Priority: Major
>
> I read data from kafka and write to hdfs by StreamingFileSink:
>  # in version 1.11.0, ParquetAvroWriters does not work, but it works well in 
> version 1.10.1;
>  #  AvroWriters works well in 1.11.0.
> {code:java}
> public class TestParquetAvroSink {
>   @Test
>   public void testParquet() throws Exception {
>     EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner()
>         .inStreamingMode().build();
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> settings);
>     env.enableCheckpointing(20000L);
>     TableSchema tableSchema = TableSchema.builder().fields(
>         new String[]{"id", "name", "sex"},
>         new DataType[]{DataTypes.STRING(), DataTypes.STRING(), 
> DataTypes.STRING()})
>         .build();
>     // build a kafka source
>     DataStream<Row> rowDataStream = xxxx;
>     Schema schema = SchemaBuilder
>         .record("xxx")
>         .namespace("xxxx")
>         .fields()
>         .optionalString("id")
>         .optionalString("name")
>         .optionalString("sex")
>         .endRecord();
>     OutputFileConfig config = OutputFileConfig
>         .builder()
>         .withPartPrefix("prefix")
>         .withPartSuffix(".ext")
>         .build();
>     StreamingFileSink<GenericRecord> sink = StreamingFileSink
>         .forBulkFormat(
>             new Path("hdfs://host:port/xxx/xxx/xxx"),
>             ParquetAvroWriters.forGenericRecord(schema))
>         .withOutputFileConfig(config)
>         .withBucketAssigner(new 
> DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
>         .build();
>     SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
>         .map(new RecordMapFunction());
>     recordDateStream.print();
>     recordDateStream.addSink(sink);
>     env.execute("test");
>   }
>   @Test
>   public void testAvro() throws Exception {
>     EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner()
>         .inStreamingMode().build();
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> settings);
>     env.enableCheckpointing(20000L);
>     TableSchema tableSchema = TableSchema.builder().fields(
>         new String[]{"id", "name", "sex"},
>         new DataType[]{DataTypes.STRING(), DataTypes.STRING(), 
> DataTypes.STRING()})
>         .build();
>     // build a kafka source
>     DataStream<Row> rowDataStream = xxxx;
>     Schema schema = SchemaBuilder
>         .record("xxx")
>         .namespace("xxxx")
>         .fields()
>         .optionalString("id")
>         .optionalString("name")
>         .optionalString("sex")
>         .endRecord();
>     OutputFileConfig config = OutputFileConfig
>         .builder()
>         .withPartPrefix("prefix")
>         .withPartSuffix(".ext")
>         .build();
>     StreamingFileSink<GenericRecord> sink = StreamingFileSink
>         .forBulkFormat(
>             new Path("hdfs://host:port/xxx/xxx/xxx"),
>             AvroWriters.forGenericRecord(schema))
>         .withOutputFileConfig(config)
>         .withBucketAssigner(new 
> DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
>         .build();
>     SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
>         .map(new RecordMapFunction());
>     recordDateStream.print();
>     recordDateStream.addSink(sink);
>     env.execute("test");
>   }
>   public static class RecordMapFunction implements MapFunction<Row, 
> GenericRecord> {
>     private transient Schema schema;
>     @Override
>     public GenericRecord map(Row row) throws Exception {
>       if (schema == null) {
>         schema = SchemaBuilder
>             .record("xxx")
>             .namespace("xxx")
>             .fields()
>             .optionalString("id")
>             .optionalString("name")
>             .optionalString("sex")
>             .endRecord();
>       }
>       Record record = new Record(schema);
>       record.put("id", row.getField(0));
>       record.put("name", row.getField(1));
>       record.put("sex", row.getField(2));
>       return record;
>     }
>   }
> } 
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to