[
https://issues.apache.org/jira/browse/SPARK-50326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arnaud Nauwynck updated SPARK-50326:
------------------------------------
Description:
when saving big Avro file with spark, it may be extremely slow because of the
"flush()" that are called many times, for each avro block ~64k of bytes.
This is especially slow on a an Hadoop VirtualFileSystem for which the flush()
is really doing many things internally, in addition to reconnecting a Https
connection (example: Azure Storage).
Currently, the code that implements the Avro format support is from hive-exec
jar.
At the lowest level, it is doing
[https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71]
{code:java}
public class AvroContainerOutputFormat {
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
Path path, Class<? extends Writable> valueClass, boolean isCompressed,
Properties properties, Progressable progressable) throws IOException {
Schema schema;
try {
schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf,
properties);
} catch (AvroSerdeException var13) {
AvroSerdeException e = var13;
throw new IOException(e);
}
GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter(schema);
DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
if (isCompressed) {
int level = jobConf.getInt("avro.mapred.deflate.level", -1);
String codecName = jobConf.get("avro.output.codec", "deflate");
CodecFactory factory = codecName.equals("deflate") ?
CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
dfw.setCodec(factory);
}
dfw.create(schema, path.getFileSystem(jobConf).create(path));
return new AvroGenericRecordWriter(dfw);
}
{code}
As you can see only 2 options are used from the spark->hadoop jobConf ->
hive-exec to the Avro library : the schema, and the codec compression.
The Avro class "DataFileWriter" supports 2 other importants attributes, but the
default value are suited only for doing small files (or kafka streaming):
{code:java}
public class DataFileWriter<D> implements Closeable, Flushable {
private int syncInterval = 64000;
private boolean flushOnEveryBlock = true;
{code}
The proposed changed is to override (upgrade?) the class in hive-exec so that
these 2 attributes could be configured with better values.
{code:java}
DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
Boolean flushOnEveryBlock = …. Default true … may change to false from optional
conf
dfw.setFlushOnEveryBlock(false);
int syncInterval = 8388608; // 8mo … default 64000, and max allowed
1073741824=1go … may change from optional conf
dfw.setFlushOnEveryBlock(syncInterval);
{code}
was:
when saving big Avro file with spark, it may be extremely slow because of the
"flush()" that are called many times, for each avro block ~64k of bytes.
This is especially slow on a an Hadoop VirtualFileSystem for which the flush()
is really doing many things internally, in addition to reconnecting a Https
connection (example: Azure Storage).
Currently, the code that implements the Avro format support is from hive-exec
jar.
At the lowest level, it is doing
{code:java}
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
Path path, Class<? extends Writable> valueClass, boolean isCompressed,
Properties properties, Progressable progressable) throws IOException {
Schema schema;
try {
schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf,
properties);
} catch (AvroSerdeException var13) {
AvroSerdeException e = var13;
throw new IOException(e);
}
GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter(schema);
DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
if (isCompressed) {
int level = jobConf.getInt("avro.mapred.deflate.level", -1);
String codecName = jobConf.get("avro.output.codec", "deflate");
CodecFactory factory = codecName.equals("deflate") ?
CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
dfw.setCodec(factory);
}
dfw.create(schema, path.getFileSystem(jobConf).create(path));
return new AvroGenericRecordWriter(dfw);
}
{code}
As you can see only 2 options are used from the spark->hadoop jobConf ->
hive-exec to the Avro library : the schema, and the codec compression.
The Avro class "DataFileWriter" supports 2 other importants attributes, but the
default value are suited only for doing small files (or kafka streaming):
{code:java}
public class DataFileWriter<D> implements Closeable, Flushable {
private int syncInterval = 64000;
private boolean flushOnEveryBlock = true;
{code}
The proposed changed is to override (upgrade?) the class in hive-exec so that
these 2 attributes could be configured with better values.
{code:java}
DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
Boolean flushOnEveryBlock = …. Default true … may change to false from optional
conf
dfw.setFlushOnEveryBlock(false);
int syncInterval = 8388608; // 8mo … default 64000, and max allowed
1073741824=1go … may change from optional conf
dfw.setFlushOnEveryBlock(syncInterval);
{code}
> override parameter to Avro DataFileWriter to avoid to many flush, 1 per block
> of 64k
> ------------------------------------------------------------------------------------
>
> Key: SPARK-50326
> URL: https://issues.apache.org/jira/browse/SPARK-50326
> Project: Spark
> Issue Type: Improvement
> Components: Input/Output
> Affects Versions: 3.1.3, 3.2.4, 3.3.4, 3.4.3, 3.5.3
> Reporter: Arnaud Nauwynck
> Priority: Minor
>
> when saving big Avro file with spark, it may be extremely slow because of the
> "flush()" that are called many times, for each avro block ~64k of bytes.
> This is especially slow on a an Hadoop VirtualFileSystem for which the
> flush() is really doing many things internally, in addition to reconnecting a
> Https connection (example: Azure Storage).
> Currently, the code that implements the Avro format support is from hive-exec
> jar.
> At the lowest level, it is doing
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71]
> {code:java}
> public class AvroContainerOutputFormat {
> public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
> Path path, Class<? extends Writable> valueClass, boolean isCompressed,
> Properties properties, Progressable progressable) throws IOException {
> Schema schema;
> try {
> schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf,
> properties);
> } catch (AvroSerdeException var13) {
> AvroSerdeException e = var13;
> throw new IOException(e);
> }
> GenericDatumWriter<GenericRecord> gdw = new
> GenericDatumWriter(schema);
> DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
> if (isCompressed) {
> int level = jobConf.getInt("avro.mapred.deflate.level", -1);
> String codecName = jobConf.get("avro.output.codec", "deflate");
> CodecFactory factory = codecName.equals("deflate") ?
> CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
> dfw.setCodec(factory);
> }
> dfw.create(schema, path.getFileSystem(jobConf).create(path));
> return new AvroGenericRecordWriter(dfw);
> }
> {code}
> As you can see only 2 options are used from the spark->hadoop jobConf ->
> hive-exec to the Avro library : the schema, and the codec compression.
> The Avro class "DataFileWriter" supports 2 other importants attributes, but
> the default value are suited only for doing small files (or kafka streaming):
> {code:java}
> public class DataFileWriter<D> implements Closeable, Flushable {
> private int syncInterval = 64000;
> private boolean flushOnEveryBlock = true;
> {code}
> The proposed changed is to override (upgrade?) the class in hive-exec so that
> these 2 attributes could be configured with better values.
> {code:java}
> DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
> Boolean flushOnEveryBlock = …. Default true … may change to false from
> optional conf
> dfw.setFlushOnEveryBlock(false);
> int syncInterval = 8388608; // 8mo … default 64000, and max allowed
> 1073741824=1go … may change from optional conf
> dfw.setFlushOnEveryBlock(syncInterval);
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]