Peter Vary commented on HIVE-17291:

Hi [~xuefuz],

I might not see the whole picture here, but my intention was to modify only the 
case when the dynamic allocation is not enabled.

The patch modifies only the {{SparkSessionImpl.getMemoryAndCores()}} method 
which is only used by {{SetSparkReducerParallelism.getSparkMemoryAndCores()}} 
method which looks like this:
  private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws 
SemanticException {
    if (sparkMemoryAndCores != null) {
    if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) {
      // If dynamic allocation is enabled, numbers for memory and cores are 
meaningless. So, we don't
      // try to get it.
      sparkMemoryAndCores = null;

    try {
      sparkMemoryAndCores = sparkSession.getMemoryAndCores();
    } catch (HiveException e) {

If the above statements are true, then in case of dynamic allocation we do not 
use this data, and the number of reducers based only on the size of the data:
  public Object process(Node nd, Stack<Node> stack,
      NodeProcessorCtx procContext, Object... nodeOutputs)
      throws SemanticException {
          // Divide it by 2 so that we can have more reducers
          long bytesPerReducer = 
context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER) / 2;
          int numReducers = Utilities.estimateReducers(numberOfBytes, 
              maxReducers, false);

          getSparkMemoryAndCores(context);     <-- In case of dynamic 
allocation this sets sparkMemoryAndCores to null
          if (sparkMemoryAndCores != null &&
              sparkMemoryAndCores.getFirst() > 0 && 
sparkMemoryAndCores.getSecond() > 0) {
            // warn the user if bytes per reducer is much larger than memory 
per task
            if ((double) sparkMemoryAndCores.getFirst() / bytesPerReducer < 
0.5) {
              LOG.warn("Average load of a reducer is much larger than its 
available memory. " +
                  "Consider decreasing hive.exec.reducers.bytes.per.reducer");

            // If there are more cores, use the number of cores
            numReducers = Math.max(numReducers, 
          numReducers = Math.min(numReducers, maxReducers);
          LOG.info("Set parallelism for reduce sink " + sink + " to: " + 
numReducers +
              " (calculated)");

Might missed something, since I am quite newby in this part of the code.

Thanks for taking the time and looking at this!

> Set the number of executors based on config if client does not provide 
> information
> ----------------------------------------------------------------------------------
>                 Key: HIVE-17291
>                 URL: https://issues.apache.org/jira/browse/HIVE-17291
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Spark
>    Affects Versions: 3.0.0
>            Reporter: Peter Vary
>            Assignee: Peter Vary
>         Attachments: HIVE-17291.1.patch
> When calculating the memory and cores and the client does not provide 
> information we should try to use the one provided by default. This can happen 
> on startup, when {{spark.dynamicAllocation.enabled}} is not enabled

This message was sent by Atlassian JIRA

Reply via email to