[
https://issues.apache.org/jira/browse/FLINK-18530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
humengyu closed FLINK-18530.
----------------------------
Resolution: Fixed
> ParquetAvroWriters can not write data to hdfs
> ---------------------------------------------
>
> Key: FLINK-18530
> URL: https://issues.apache.org/jira/browse/FLINK-18530
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.11.0
> Reporter: humengyu
> Priority: Major
>
> I read data from kafka and write to hdfs by StreamingFileSink:
> # in version 1.11.0, ParquetAvroWriters does not work, but it works well in
> version 1.10.1;
> # AvroWriters works well in 1.11.0.
> {code:java}
> public class TestParquetAvroSink {
> @Test
> public void testParquet() throws Exception {
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> env.enableCheckpointing(20000L);
> TableSchema tableSchema = TableSchema.builder().fields(
> new String[]{"id", "name", "sex"},
> new DataType[]{DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()})
> .build();
> // build a kafka source
> DataStream<Row> rowDataStream = xxxx;
> Schema schema = SchemaBuilder
> .record("xxx")
> .namespace("xxxx")
> .fields()
> .optionalString("id")
> .optionalString("name")
> .optionalString("sex")
> .endRecord();
> OutputFileConfig config = OutputFileConfig
> .builder()
> .withPartPrefix("prefix")
> .withPartSuffix(".ext")
> .build();
> StreamingFileSink<GenericRecord> sink = StreamingFileSink
> .forBulkFormat(
> new Path("hdfs://host:port/xxx/xxx/xxx"),
> ParquetAvroWriters.forGenericRecord(schema))
> .withOutputFileConfig(config)
> .withBucketAssigner(new
> DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
> .build();
> SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
> .map(new RecordMapFunction());
> recordDateStream.print();
> recordDateStream.addSink(sink);
> env.execute("test");
> }
> @Test
> public void testAvro() throws Exception {
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> env.enableCheckpointing(20000L);
> TableSchema tableSchema = TableSchema.builder().fields(
> new String[]{"id", "name", "sex"},
> new DataType[]{DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()})
> .build();
> // build a kafka source
> DataStream<Row> rowDataStream = xxxx;
> Schema schema = SchemaBuilder
> .record("xxx")
> .namespace("xxxx")
> .fields()
> .optionalString("id")
> .optionalString("name")
> .optionalString("sex")
> .endRecord();
> OutputFileConfig config = OutputFileConfig
> .builder()
> .withPartPrefix("prefix")
> .withPartSuffix(".ext")
> .build();
> StreamingFileSink<GenericRecord> sink = StreamingFileSink
> .forBulkFormat(
> new Path("hdfs://host:port/xxx/xxx/xxx"),
> AvroWriters.forGenericRecord(schema))
> .withOutputFileConfig(config)
> .withBucketAssigner(new
> DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
> .build();
> SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
> .map(new RecordMapFunction());
> recordDateStream.print();
> recordDateStream.addSink(sink);
> env.execute("test");
> }
> public static class RecordMapFunction implements MapFunction<Row,
> GenericRecord> {
> private transient Schema schema;
> @Override
> public GenericRecord map(Row row) throws Exception {
> if (schema == null) {
> schema = SchemaBuilder
> .record("xxx")
> .namespace("xxx")
> .fields()
> .optionalString("id")
> .optionalString("name")
> .optionalString("sex")
> .endRecord();
> }
> Record record = new Record(schema);
> record.put("id", row.getField(0));
> record.put("name", row.getField(1));
> record.put("sex", row.getField(2));
> return record;
> }
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)