[ 
https://issues.apache.org/jira/browse/SPARK-50326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Nauwynck updated SPARK-50326:
------------------------------------
    Description: 
when saving big Avro file with spark, it may be extremely slow because of the 
"flush()" that are called many times, for each avro block ~64k of bytes.

This is especially slow on a an Hadoop VirtualFileSystem for which the flush() 
is really doing many things internally, in addition to reconnecting a Https 
connection (example: Azure Storage).


Currently, the code that implements the Avro format support is from hive-exec 
jar.
At the lowest level, it is doing 

[https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71]

{code:java}
public class AvroContainerOutputFormat {

    public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, 
Path path, Class<? extends Writable> valueClass, boolean isCompressed, 
Properties properties, Progressable progressable) throws IOException {
        Schema schema;
        try {
            schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf, 
properties);
        } catch (AvroSerdeException var13) {
            AvroSerdeException e = var13;
            throw new IOException(e);
        }

        GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter(schema);
        DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
        if (isCompressed) {
            int level = jobConf.getInt("avro.mapred.deflate.level", -1);
            String codecName = jobConf.get("avro.output.codec", "deflate");
            CodecFactory factory = codecName.equals("deflate") ? 
CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
            dfw.setCodec(factory);
        }

        dfw.create(schema, path.getFileSystem(jobConf).create(path));
        return new AvroGenericRecordWriter(dfw);
    }
{code}

As you can see only 2 options are used from the spark->hadoop jobConf -> 
hive-exec to the Avro library :  the schema, and the codec compression.

The Avro class "DataFileWriter" supports 2 other importants attributes, but the 
default value are suited only for doing small files (or kafka streaming):

{code:java}
public class DataFileWriter<D> implements Closeable, Flushable {
    private int syncInterval = 64000;
    private boolean flushOnEveryBlock = true;
{code}

The proposed changed is to override (upgrade?) the class in hive-exec so that 
these 2 attributes could be configured with better values.


{code:java}
DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);

Boolean flushOnEveryBlock = …. Default true … may change to false from optional 
conf
               dfw.setFlushOnEveryBlock(false);

int syncInterval =  8388608; // 8mo  …  default 64000, and max allowed 
1073741824=1go  … may change from optional conf
               dfw.setFlushOnEveryBlock(syncInterval);

{code}




  was:
when saving big Avro file with spark, it may be extremely slow because of the 
"flush()" that are called many times, for each avro block ~64k of bytes.

This is especially slow on a an Hadoop VirtualFileSystem for which the flush() 
is really doing many things internally, in addition to reconnecting a Https 
connection (example: Azure Storage).


Currently, the code that implements the Avro format support is from hive-exec 
jar.
At the lowest level, it is doing 

{code:java}
    public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, 
Path path, Class<? extends Writable> valueClass, boolean isCompressed, 
Properties properties, Progressable progressable) throws IOException {
        Schema schema;
        try {
            schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf, 
properties);
        } catch (AvroSerdeException var13) {
            AvroSerdeException e = var13;
            throw new IOException(e);
        }

        GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter(schema);
        DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
        if (isCompressed) {
            int level = jobConf.getInt("avro.mapred.deflate.level", -1);
            String codecName = jobConf.get("avro.output.codec", "deflate");
            CodecFactory factory = codecName.equals("deflate") ? 
CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
            dfw.setCodec(factory);
        }

        dfw.create(schema, path.getFileSystem(jobConf).create(path));
        return new AvroGenericRecordWriter(dfw);
    }
{code}

As you can see only 2 options are used from the spark->hadoop jobConf -> 
hive-exec to the Avro library :  the schema, and the codec compression.

The Avro class "DataFileWriter" supports 2 other importants attributes, but the 
default value are suited only for doing small files (or kafka streaming):

{code:java}
public class DataFileWriter<D> implements Closeable, Flushable {
    private int syncInterval = 64000;
    private boolean flushOnEveryBlock = true;
{code}

The proposed changed is to override (upgrade?) the class in hive-exec so that 
these 2 attributes could be configured with better values.


{code:java}
DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);

Boolean flushOnEveryBlock = …. Default true … may change to false from optional 
conf
               dfw.setFlushOnEveryBlock(false);

int syncInterval =  8388608; // 8mo  …  default 64000, and max allowed 
1073741824=1go  … may change from optional conf
               dfw.setFlushOnEveryBlock(syncInterval);

{code}





> override parameter to Avro DataFileWriter to avoid to many flush, 1 per block 
> of 64k
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-50326
>                 URL: https://issues.apache.org/jira/browse/SPARK-50326
>             Project: Spark
>          Issue Type: Improvement
>          Components: Input/Output
>    Affects Versions: 3.1.3, 3.2.4, 3.3.4, 3.4.3, 3.5.3
>            Reporter: Arnaud Nauwynck
>            Priority: Minor
>
> when saving big Avro file with spark, it may be extremely slow because of the 
> "flush()" that are called many times, for each avro block ~64k of bytes.
> This is especially slow on a an Hadoop VirtualFileSystem for which the 
> flush() is really doing many things internally, in addition to reconnecting a 
> Https connection (example: Azure Storage).
> Currently, the code that implements the Avro format support is from hive-exec 
> jar.
> At the lowest level, it is doing 
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java#L71]
> {code:java}
> public class AvroContainerOutputFormat {
>     public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, 
> Path path, Class<? extends Writable> valueClass, boolean isCompressed, 
> Properties properties, Progressable progressable) throws IOException {
>         Schema schema;
>         try {
>             schema = AvroSerdeUtils.determineSchemaOrThrowException(jobConf, 
> properties);
>         } catch (AvroSerdeException var13) {
>             AvroSerdeException e = var13;
>             throw new IOException(e);
>         }
>         GenericDatumWriter<GenericRecord> gdw = new 
> GenericDatumWriter(schema);
>         DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
>         if (isCompressed) {
>             int level = jobConf.getInt("avro.mapred.deflate.level", -1);
>             String codecName = jobConf.get("avro.output.codec", "deflate");
>             CodecFactory factory = codecName.equals("deflate") ? 
> CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
>             dfw.setCodec(factory);
>         }
>         dfw.create(schema, path.getFileSystem(jobConf).create(path));
>         return new AvroGenericRecordWriter(dfw);
>     }
> {code}
> As you can see only 2 options are used from the spark->hadoop jobConf -> 
> hive-exec to the Avro library :  the schema, and the codec compression.
> The Avro class "DataFileWriter" supports 2 other importants attributes, but 
> the default value are suited only for doing small files (or kafka streaming):
> {code:java}
> public class DataFileWriter<D> implements Closeable, Flushable {
>     private int syncInterval = 64000;
>     private boolean flushOnEveryBlock = true;
> {code}
> The proposed changed is to override (upgrade?) the class in hive-exec so that 
> these 2 attributes could be configured with better values.
> {code:java}
> DataFileWriter<GenericRecord> dfw = new DataFileWriter(gdw);
> Boolean flushOnEveryBlock = …. Default true … may change to false from 
> optional conf
>                dfw.setFlushOnEveryBlock(false);
> int syncInterval =  8388608; // 8mo  …  default 64000, and max allowed 
> 1073741824=1go  … may change from optional conf
>                dfw.setFlushOnEveryBlock(syncInterval);
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to