Hey,
If my understanding is correct. The planInputPatition function in
SparkBatchScan
Creates tasks to read data.
Each task contains files whose block locations we will fetch in ReadTask
Constructor provided, *localityPrefered *is true.

https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L274

if (localityPreferred) {
    Table table = tableBroadcast.value();
    this.preferredLocations = Util.blockLocations(table.io(), task);
} else {
    this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
}


If there are 900 tasks created and each task has 100 files then we will
make 90000  fs.getFileBlockLocations  RPC call.
If each RPC call takes  1ms then a total of 90000 * 0.001 =  90 sec of
time  becomes part  planning time.
And also planInputpartitions gets called repetitively,

Though we can avoid  fs.getFileBlockLocations  RPC  call by passing option
locality=false while reading using DataFrameReader apis there is no way to
disable locality when we fire a sql query(like Merge into) which scans the
table.

I am suggesting we can either parallelize the following code block which
creates a ReadTask.

https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L128

for (int i = 0; i < scanTasks.size(); i++) {
    readTasks[i] = new ReadTask(
            scanTasks.get(i), tableBroadcast, expectedSchemaString,
            caseSensitive, localityPreferred);
}


or
Add a table level property  like  table.sql.locality.default = true. to
enable/disable the localityPreference of the table.

public static boolean isLocalityEnabled(FileIO io,
org.apache.iceberg.Table table, CaseInsensitiveStringMap readOptions)
{
    InputFile in = io.newInputFile(table.location());
    if (in instanceof HadoopInputFile) {
        String scheme = ((HadoopInputFile) in).getFileSystem().getScheme();
        boolean localityPreference =
readOptions.getBoolean("locality",
LOCALITY_WHITELIST_FS.contains(scheme));
        return localityPreference &&
Boolean.valueOf(table.properties().getOrDefault("table.sql.locality.default",
"true"));
    }
    return false;
}

Thanks and Regards,
Vivek

Reply via email to