Vivek,

I think you have a few good ideas here. I think it makes sense to
parallelize the calls to get block locations. And it is also a gap that
there isn’t a way to turn off locality when reading from a table in HDFS
through SparkSQL.

I’m not sure that a table property makes sense, since this is so specific
to Spark and the underlying storage, but I wouldn’t mind adding
read.spark.locality.enabled for this if other people think it would help.
We just want to be clear that this only configures Spark since the
trade-off would be different for other engines.

I also think that this is an area where our SQL extensions could help. I’ve
been considering adding a way to process hints that adds read options. That
way you could actually set the locality option, like this:

SELECT /*+ OPTION("locality", false) */ * FROM t;

I think that would be a good extension since there’s definitely a gap in
Spark between what you can do with the DataFrameReader/options and SQL.

Ryan

On Mon, Jun 21, 2021 at 9:46 AM vivek B <[email protected]> wrote:

> Hey,
> If my understanding is correct. The planInputPatition function in
> SparkBatchScan
> Creates tasks to read data.
> Each task contains files whose block locations we will fetch in ReadTask
> Constructor provided, *localityPrefered *is true.
>
>
> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L274
>
> if (localityPreferred) {
>     Table table = tableBroadcast.value();
>     this.preferredLocations = Util.blockLocations(table.io(), task);
> } else {
>     this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
> }
>
>
> If there are 900 tasks created and each task has 100 files then we will
> make 90000  fs.getFileBlockLocations  RPC call.
> If each RPC call takes  1ms then a total of 90000 * 0.001 =  90 sec of
> time  becomes part  planning time.
> And also planInputpartitions gets called repetitively,
>
> Though we can avoid  fs.getFileBlockLocations  RPC  call by passing
> option locality=false while reading using DataFrameReader apis there is no
> way to disable locality when we fire a sql query(like Merge into) which
> scans the table.
>
> I am suggesting we can either parallelize the following code block which
> creates a ReadTask.
>
>
> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L128
>
> for (int i = 0; i < scanTasks.size(); i++) {
>     readTasks[i] = new ReadTask(
>             scanTasks.get(i), tableBroadcast, expectedSchemaString,
>             caseSensitive, localityPreferred);
> }
>
>
> or
> Add a table level property  like  table.sql.locality.default = true. to
> enable/disable the localityPreference of the table.
>
> public static boolean isLocalityEnabled(FileIO io, org.apache.iceberg.Table 
> table, CaseInsensitiveStringMap readOptions) {
>     InputFile in = io.newInputFile(table.location());
>     if (in instanceof HadoopInputFile) {
>         String scheme = ((HadoopInputFile) in).getFileSystem().getScheme();
>         boolean localityPreference = readOptions.getBoolean("locality", 
> LOCALITY_WHITELIST_FS.contains(scheme));
>         return localityPreference && 
> Boolean.valueOf(table.properties().getOrDefault("table.sql.locality.default", 
> "true"));
>     }
>     return false;
> }
>
> Thanks and Regards,
> Vivek
>
>

-- 
Ryan Blue

Reply via email to