Liu created FLINK-28208:
---------------------------
Summary: The method createBatchSink in class HiveTableSink should
setParallelism for map operator
Key: FLINK-28208
URL: https://issues.apache.org/jira/browse/FLINK-28208
Project: Flink
Issue Type: Improvement
Components: Connectors / Hive
Affects Versions: 1.16.0
Reporter: Liu
The problem is found when using Adaptive Batch Scheduler. In these, a simple
SQL like "select * from * where *" would generate three operators including
source, map and sink. The map's parallelism is set to -1 by default and is not
the same with source and sink. As a result, the three operators can not be
chained together.
The reason is that we add map operator in method createBatchSink but not
setParallelism. The changed code is as following:
{code:java}
private DataStreamSink<Row> createBatchSink(
DataStream<RowData> dataStream,
DataStructureConverter converter,
StorageDescriptor sd,
HiveWriterFactory recordWriterFactory,
OutputFileConfig fileNaming,
final int parallelism)
throws IOException {
...
return dataStream
.map((MapFunction<RowData, Row>) value -> (Row)
converter.toExternal(value))
.setParallelism(parallelism) // New added to ensure the right
parallelism .writeUsingOutputFormat(builder.build())
.setParallelism(parallelism);
} {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)