[
https://issues.apache.org/jira/browse/HIVE-17291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16123826#comment-16123826
]
Peter Vary commented on HIVE-17291:
-----------------------------------
Hi [~xuefuz],
I might not see the whole picture here, but my intention was to modify only the
case when the dynamic allocation is not enabled.
The patch modifies only the {{SparkSessionImpl.getMemoryAndCores()}} method
which is only used by {{SetSparkReducerParallelism.getSparkMemoryAndCores()}}
method which looks like this:
{code:title=SetSparkReducerParallelism}
private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws
SemanticException {
if (sparkMemoryAndCores != null) {
return;
}
if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) {
// If dynamic allocation is enabled, numbers for memory and cores are
meaningless. So, we don't
// try to get it.
sparkMemoryAndCores = null;
return;
}
[..]
try {
[..]
sparkMemoryAndCores = sparkSession.getMemoryAndCores();
} catch (HiveException e) {
[..]
}
}
{code}
If the above statements are true, then in case of dynamic allocation we do not
use this data, and the number of reducers based only on the size of the data:
{code:title=SetSparkReducerParallelism}
@Override
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procContext, Object... nodeOutputs)
throws SemanticException {
[..]
// Divide it by 2 so that we can have more reducers
long bytesPerReducer =
context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER) / 2;
int numReducers = Utilities.estimateReducers(numberOfBytes,
bytesPerReducer,
maxReducers, false);
getSparkMemoryAndCores(context); <-- In case of dynamic
allocation this sets sparkMemoryAndCores to null
if (sparkMemoryAndCores != null &&
sparkMemoryAndCores.getFirst() > 0 &&
sparkMemoryAndCores.getSecond() > 0) {
// warn the user if bytes per reducer is much larger than memory
per task
if ((double) sparkMemoryAndCores.getFirst() / bytesPerReducer <
0.5) {
LOG.warn("Average load of a reducer is much larger than its
available memory. " +
"Consider decreasing hive.exec.reducers.bytes.per.reducer");
}
// If there are more cores, use the number of cores
numReducers = Math.max(numReducers,
sparkMemoryAndCores.getSecond());
}
numReducers = Math.min(numReducers, maxReducers);
LOG.info("Set parallelism for reduce sink " + sink + " to: " +
numReducers +
" (calculated)");
desc.setNumReducers(numReducers);
[..]
}
{code}
Might missed something, since I am quite newby in this part of the code.
Thanks for taking the time and looking at this!
Peter
> Set the number of executors based on config if client does not provide
> information
> ----------------------------------------------------------------------------------
>
> Key: HIVE-17291
> URL: https://issues.apache.org/jira/browse/HIVE-17291
> Project: Hive
> Issue Type: Sub-task
> Components: Spark
> Affects Versions: 3.0.0
> Reporter: Peter Vary
> Assignee: Peter Vary
> Attachments: HIVE-17291.1.patch
>
>
> When calculating the memory and cores and the client does not provide
> information we should try to use the one provided by default. This can happen
> on startup, when {{spark.dynamicAllocation.enabled}} is not enabled
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)